Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).
Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.
...
Code Block | ||||
---|---|---|---|---|
| ||||
class RunDocker(luigi.Task): image_id = luigi.Parameter(default="") image_name = luigi.Parameter(default="") __docker_container = None @property def docker_container(self): if self.__docker_container is None: self.__docker_container = DockerContainer(str(self.image_id), str(self.image_name)) return self.__docker_container @property def params(self): raise NotImplementedError("This method must be overridden") def output(self): return self.docker_container def run(self): self.docker_container.params = self.params self.docker_container.run() # wait until the container terminates while self.docker_container.is_running: time.sleep(1) def complete(self): ... |
The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.
...
As solution 2 and 3 introduces a lot more complexity within our infrastructure we would like to avoid them. Solution 1 however has the serious disadvantage, as it may invalidate early computed results.
We therefore introduce the following renewal:
...