Luigi allows the definition and execution of Tasks which are connected towards a graph structure (representing our workflow).

 

Each connection within the graph holds 0..n targets which are produced by the upstream task and (may be) consumed by the downstream task.

In the above example Target1 is produced by Task1 and consumed by Task2.

 

The definition of a single task involves five methods:

requires()Defines the upstream tasks
input()Gives access to the targets produced by the upstream tasks
output()Defines the targets produced by the given task
run()Executes the given task
complete()Checks wherever the given task needs to be run

The following code shows the implementation of Task2. Hereby, the method input() is inherited from the luigi.Task base class.

class Task2(luigi.Task):
    param1 = luigi.IntParameter()
    task2 = None
    task3 = None

def requires(self):
    yield Task1()

def output(self):
    return [this.task2, this.task3]

def run(self):
    print("Run Task {task}.".format(task=self.__class__.__name__))
    print("Parameters: (param1: {0})".format(self.param1))
    # produce Target2 and Target3
    this.task2 = Task2(param1)
    this.task3 = Task3(param1)

def complete(self):
    outputs = flatten(self.output())
    return all(map(lambda output: output.exists(), outputs))

 

We have to adapt the above over-simplified example for our FLARECAST infrastructure where each task represents the run of a docker container and each target represents the container itself.

The corresponding base class is given as follows:

class RunDocker(luigi.Task):
    image_id = luigi.Parameter(default="")
    image_name = luigi.Parameter(default="")
    __docker_container = None

    @property
    def docker_container(self):
        if self.__docker_container is None:
            self.__docker_container = DockerContainer(str(self.image_id), str(self.image_name))
        return self.__docker_container

    @property
    def params(self):
        raise NotImplementedError("This method must be overridden")

    def output(self):
        return self.docker_container

    def run(self):
        self.docker_container.params = self.params

        self.docker_container.run()
        # wait until the container terminates
        while self.docker_container.is_running:
            time.sleep(1)

    def complete(self):
        ...

 

The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.

A problem occurs if we have new or modified data within our database. Let us assume we first run a workflow, afterwards manipulate some data within the staging table and run our workflow again. The second time we run the workflow Luigi will go check from the last task upwards wherever it is complete or not. If this task represents a prediction algorithm its complete() method will return True as the task not yet knows as its input data will change by a preceding task. Luigi would have to traverse all the graph upwards towards the task which finally manipulates the data and provides new input data for all succeed tasks.

Following, we present three solutions to this problem:

SolutionDescriptionAdvantagesDisadvantages
1

[always re-run the workflow]

Check wherever all input targets exists.

As a new instantiated task do not know about its related docker container
its output targets are all inexistent. For that reason, all tasks are incomplete
for Luigi the first place and all tasks will be executed.

We do not check wherever new data are available within the database

It provides a simple implementation

The full graph has to be executed for each run

It does not follow the concept of Luigi

Existing data are always overwritten which makes results,
e.g. predictions, based on the old data invalid

2

[check for invalid runs on the fly - JIT approach]

Check wherever an algorithm was successfully ran, using the algorithm_run
table within the database. Furthermore, check wherever new data exist
within the given time range to proceed [start_time, end_time].

Which data to concern depends on the algorithm type, e.g. feature extraction
or prediction algorithm. As an example for a prediction algorithm, one could
verify wherever the latest creation datetime of a property within a property
group used as input data for the algorithm is older than the timestamp from
the last successful ran of the algorithm stored within the database. If so, the
last run is outdated and we have to re-run the task.

An early termination of the graph is possible

It does follow the concept of Luigi

Comparatively with solution (3) we can check wherever a run is
invalid on the fly, without the need of an invalidation flag

 

It requires a more complex implementation
(e.g. the differentiation of algorithm types and their
input data)
3

[invalidate algorithms while overwriting data - MEMORY approach]

Check wherever an algorithm was successfully run AND is still valid,
using the algorithm_run table within the database.

While overwriting old data within the database evaluate all dependent
algorithm runs and set its state as invalid. This is especially cumbersome
for algorithms which inputs are only given by the access_history table, e.g.
ML algorithms which store their trained parameters as an algorithm
configuration. Hereby, the access_history query has to be re-executed
and verified wherever a newly added property is included.

An early termination of the graph is possible

It does follow the concept of Luigi

Invalid algorithm runs are flagged which may be useful for post-
processing steps, e.g. the visualization of valid predictions

It requires a more complex implementation
(e.g. resolving property groups from the access_history)

We cannot check wherever a data loader algorithm is
invalid or not as we do not have a relation between
data within the staging table and the algorithm which
added/modified the data

As solution 2 and 3 introduces a lot more complexity within our infrastructure we would like to avoid them. Solution 1 however has the serious disadvantage, as it may invalidate early computed results.

 

We therefore introduce the following renewal:

We provide two datasets, one with actual data and one with data older than 17 days. Hereby, we do NOT allow any manual modification on the first dataset and expect, as master data, e.g. SWPC data, older than 17 days are stable and do not change anymore.

While running a workflow each 6 hours on the actual data we know as only the newest prediction is accurate - all others are out-dated and their input-data cannot be reconstructed. As no modification is allowed we know as the newest prediction is always accurate and no manual modification of the underlying data happened. Concerning the historical data, we do not need to worry about corrupting our data as the master data and hereby the resulting properties will not change anymore.