Page tree
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).

Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.

In the above example Target1 is produced by Task1 and consumed by Task2.

 

The definition of a single task involves five methods:

requires()Defines the upstream tasks
input()Gives access to the targets produced by the upstream tasks
output()Defines the targets produced by the given task
run()Executes the given task
complete()Checks wherever the given task needs to be run

The following code shows the implementation of Task2. Hereby, the method input() is inherited from the luigi.Task base class.

A Generic Luigi Task
class Task2(luigi.Task):
    task_namespace = 'examples'
    param1 = luigi.IntParameter()
    task2 = None
    task3 = None

def requires(self):
    yield Task1()

def output(self):
    return [this.task2, this.task3]

def run(self):
    print("Run Task {task}.".format(task=self.__class__.__name__))
    print("Parameters: (param1: {0})".format(self.param01))
    # produce Target2 and Target3
    this.task2 = Task2(param1)
    this.task3 = Task3(param1)

def complete(self):
    outputs = flatten(self.output())
    return all(map(lambda output: output.exists(), outputs))

 

We have to adapt the above over-simplified example for our FLARECAST infrastructure where each task represents the run of a docker container and each target represents the container itself.

The corresponding base class is given as follows:

The RunDocker Base Class
class RunDocker(luigi.Task):
    imageId = luigi.Parameter(default="")
    imageName = luigi.Parameter(default="")
    __dockerContainer = None

    @property
    def docker_container(self):
        if self.__dockerContainer is None:
            self.__dockerContainer = DockerContainer(str(self.imageId), str(self.imageName))
        return self.__dockerContainer

    @property
    def params(self):
        raise NotImplementedError("This method must be overridden")

    def output(self):
        return self.docker_container

    def run(self):
        self.docker_container.params = self.params

        self.docker_container.run()
        # wait until the container was terminated
        while self.docker_container.is_running:
            time.sleep(1)

    def complete(self):
        # SOLUTION1: do all input targets exist? (force a run of all upstream tasks)
        # PROS: No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi
        # CONS: Run full graph; (Always) overwrites existing entries (if not handled by algorithm); 
        ...

        # SOLUTION2: [CHECK->] check within database if task was already run &
                     check wherever new* [staging data or properties or predictions] exist within the given time range [time_start, time_end]
                     * if last_algo_run_timestamp < latest_property_timestamp_with_propertygroup_within_timerange
        # PROS: Early termination of workflow; No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi
        # CONS: Complex implementation (different checks for different pipeline steps)
        ...

        # SOLUTION3: [<-MEMORY] check within database if task was already run AND if its entry is still valide
        # PROS: Early termination of workflow; Simple implementation; Follows the concept of Luigi
        # PLUS: Flag within database can be used for later visualization, etc.
        # CONS: May only affect prediction and validation algorithms (otherwise the resolving of the access_history is req. which adds a lot of complexity)
        ...

The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.

 

 

  • No labels