Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).
Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.
In the above example Target1 is produced by Task1 and consumed by Task2.
The definition of a single task involves five methods:
requires() | Defines the upstream tasks |
input() | Gives access to the targets produced by the upstream tasks |
output() | Defines the targets produced by the given task |
run() | Executes the given task |
complete() | Checks wherever the given task needs to be run |
The following code shows the implementation of Task2. Hereby, the method input() is inherited from the luigi.Task base class.
class Task2(luigi.Task): param1 = luigi.IntParameter() task2 = None task3 = None def requires(self): yield Task1() def output(self): return [this.task2, this.task3] def run(self): print("Run Task {task}.".format(task=self.__class__.__name__)) print("Parameters: (param1: {0})".format(self.param1)) # produce Target2 and Target3 this.task2 = Task2(param1) this.task3 = Task3(param1) def complete(self): outputs = flatten(self.output()) return all(map(lambda output: output.exists(), outputs))
We have to adapt the above over-simplified example for our FLARECAST infrastructure where each task represents the run of a docker container and each target represents the container itself.
The corresponding base class is given as follows:
class RunDocker(luigi.Task): image_id = luigi.Parameter(default="") image_name = luigi.Parameter(default="") __docker_container = None @property def docker_container(self): if self.__docker_container is None: self.__docker_container = DockerContainer(str(self.image_id), str(self.image_name)) return self.__docker_container @property def params(self): raise NotImplementedError("This method must be overridden") def output(self): return self.docker_container def run(self): self.docker_container.params = self.params self.docker_container.run() # wait until the container terminates while self.docker_container.is_running: time.sleep(1) def complete(self): ...
The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.
A problem occurs if we have new or modified data within our database. Let us assume we first run a workflow, afterwards manipulate some data within the staging table and run our workflow again. The second time we run the workflow Luigi will go check from the last task upwards wherever it is complete or not. If this task represents a prediction algorithm its complete() method will return True as the task not yet knows as its input data will change by a preceding task. Luigi would have to traverse all the graph upwards towards the task which finally manipulates the data and provides new input data for all succeed tasks.
Following, we present three solutions to this problem:
Solution | Description | Advantages | Disadvantages |
---|---|---|---|
1 | [always re-run the workflow] Check wherever all input targets exists. As a new instantiated task do not know about its related docker container | We do not check wherever new data are available within the database It provides a simple implementation | The full graph has to be executed for each run It does not follow the concept of Luigi Existing data are always overwritten which makes results, |
2 | [check for invalid runs on the fly - JIT approach] Check wherever an algorithm was successfully ran, using the algorithm_run Which data to concern depends on the algorithm type, e.g. feature extraction | An early termination of the graph is possible It does follow the concept of Luigi Comparatively with solution (3) we can check wherever a run is
| It requires a more complex implementation (e.g. the differentiation of algorithm types and their input data) |
3 | [invalidate algorithms while overwriting data - MEMORY approach] Check wherever an algorithm was successfully run AND is still valid, While overwriting old data within the database evaluate all dependent | An early termination of the graph is possible It does follow the concept of Luigi Invalid algorithm runs are flagged which may be useful for post- | It requires a more complex implementation We cannot check wherever a data loader algorithm is |
As solution 2 and 3 introduces a lot more complexity within our infrastructure we would like to avoid them. Solution 1 however has the serious disadvantage, as it may invalidate early computed results.
We therefore introduce the following renewal:
We provide two datasets, one with actual data and one with data older than 17 days. Hereby, we do NOT allow any manual modification on the first dataset and expect, as master data, e.g. SWPC data, older than 17 days are stable and do not change anymore.
While running a workflow each 6 hours on the actual data we know as only the newest prediction is accurate - all others are out-dated and their input-data cannot be reconstructed. As no modification is allowed we know as the newest prediction is always accurate and no manual modification of the underlying data happened. Concerning the historical data, we do not need to worry about corrupting our data as the master data and hereby the resulting properties will not change anymore.