Page tree
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).

Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.

In the above example Target1 is produced by Task1 and consumed by Task2.

 

The definition of a single task involves five methods:

requires()Defines the upstream tasks
input()Gives access to the targets produced by the upstream tasks
output()Defines the targets produced by the given task
run()Executes the given task
complete()Checks wherever the given task needs to be run

The following code shows the implementation of Task2. Hereby, the method input() is inherited from the luigi.Task base class.

A Generic Luigi Task
class Task2(luigi.Task):
    task_namespace = 'examples'
    param1 = luigi.IntParameter()
    task2 = None
    task3 = None

def requires(self):
    yield Task1()

def output(self):
    return [this.task2, this.task3]

def run(self):
    print("Run Task {task}.".format(task=self.__class__.__name__))
    print("Parameters: (param1: {0})".format(self.param01))
    # produce Target2 and Target3
    this.task2 = Task2(param1)
    this.task3 = Task3(param1)

def complete(self):
    outputs = flatten(self.output())
    return all(map(lambda output: output.exists(), outputs))

 

We have to adapt the above over-simplified example for our FLARECAST infrastructure where each task represents the run of a docker container and each target represents the container itself.

The corresponding base class is given as follows:

The RunDocker Base Class
class RunDocker(luigi.Task):
    imageId = luigi.Parameter(default="")
    imageName = luigi.Parameter(default="")
    __dockerContainer = None

    @property
    def docker_container(self):
        if self.__dockerContainer is None:
            self.__dockerContainer = DockerContainer(str(self.imageId), str(self.imageName))
        return self.__dockerContainer

    @property
    def params(self):
        raise NotImplementedError("This method must be overridden")

    def output(self):
        return self.docker_container

    def run(self):
        self.docker_container.params = self.params

        self.docker_container.run()
        # wait until the container was terminated
        while self.docker_container.is_running:
            time.sleep(1)

    def complete(self):
        ...

 

 

  • No labels