Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).
Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.
In the above example Target1 is produced by Task1 and consumed by Task2.
The definition of a single task involves five methods:
requires() | Defines the upstream tasks |
input() | Gives access to the targets produced by the upstream tasks |
output() | Defines the targets produced by the given task |
run() | Executes the given task |
complete() | Checks wherever the given task needs to be run |
The following code shows the implementation of Task2. Hereby, the method input() is inherited from the luigi.Task base class.
class Task2(luigi.Task): task_namespace = 'examples' param1 = luigi.IntParameter() task2 = None task3 = None def requires(self): yield Task1() def output(self): return [this.task2, this.task3] def run(self): print("Run Task {task}.".format(task=self.__class__.__name__)) print("Parameters: (param1: {0})".format(self.param01)) # produce Target2 and Target3 this.task2 = Task2(param1) this.task3 = Task3(param1) def complete(self): outputs = flatten(self.output()) return all(map(lambda output: output.exists(), outputs))
We have to adapt the above over-simplified example for our FLARECAST infrastructure where each task represents the run of a docker container and each target represents the container itself.
The corresponding base class is given as follows:
class RunDocker(luigi.Task): imageId = luigi.Parameter(default="") imageName = luigi.Parameter(default="") __dockerContainer = None @property def docker_container(self): if self.__dockerContainer is None: self.__dockerContainer = DockerContainer(str(self.imageId), str(self.imageName)) return self.__dockerContainer @property def params(self): raise NotImplementedError("This method must be overridden") def output(self): return self.docker_container def run(self): self.docker_container.params = self.params self.docker_container.run() # wait until the container was terminated while self.docker_container.is_running: time.sleep(1) def complete(self): # SOLUTION1: do all input targets exist? (force a run of all upstream tasks) # PROS: No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi # CONS: Run full graph; (Always) overwrites existing entries (if not handled by algorithm); ... # SOLUTION2: [CHECK->] check within database if task was already run & # check wherever new* [staging data or properties or predictions] exist within the given time range [time_start, time_end] # * if last_algo_run_timestamp < latest_property_timestamp_with_propertygroup_within_timerange # PROS: Early termination of workflow; No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi # CONS: Complex implementation (different checks for different pipeline steps) ... # SOLUTION3: [<-MEMORY] check within database if task was already run AND if its entry is still valide # PROS: Early termination of workflow; Simple implementation; Follows the concept of Luigi # PLUS: Flag within database can be used for later visualization, etc. # CONS: May only affect prediction and validation algorithms (otherwise the resolving of the access_history is req. which adds a lot of complexity) ...
The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.