Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).
Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.
In the above example Target1 is produced by Task1 and consumed by Task2.
The definition of a single task involves five methods:
requires() | Defines the upstream tasks |
input() | Gives access to the targets produced by the upstream tasks |
output() | Defines the targets produced by the given task |
run() | Executes the given task |
complete() | Checks wherever the given task needs to be run |
The following code shows the implementation of Task2. Hereby, the method input() is inherited from the luigi.Task base class.
class Task2(luigi.Task): task_namespace = 'examples' param1 = luigi.IntParameter() task2 = None task3 = None def requires(self): yield Task1() def output(self): return [this.task2, this.task3] def run(self): print("Run Task {task}.".format(task=self.__class__.__name__)) print("Parameters: (param1: {0})".format(self.param01)) # produce Target2 and Target3 this.task2 = Task2(param1) this.task3 = Task3(param1) def complete(self): outputs = flatten(self.output()) return all(map(lambda output: output.exists(), outputs))
We have to adapt the above over-simplified example for our FLARECAST infrastructure where each task represents the run of a docker container and each target represents the container itself.
The corresponding base class is given as follows:
class RunDocker(luigi.Task): imageId = luigi.Parameter(default="") imageName = luigi.Parameter(default="") __dockerContainer = None @property def docker_container(self): if self.__dockerContainer is None: self.__dockerContainer = DockerContainer(str(self.imageId), str(self.imageName)) return self.__dockerContainer @property def params(self): raise NotImplementedError("This method must be overridden") def output(self): return self.docker_container def run(self): self.docker_container.params = self.params self.docker_container.run() # wait until the container was terminated while self.docker_container.is_running: time.sleep(1) def complete(self): ...