Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
titleA Generic Luigi Task
linenumberstrue
class Task2(luigi.Task):
    task_namespace = 'examples'
    param1 = luigi.IntParameter()
    task2 = None
    task3 = None

def requires(self):
    yield Task1()

def output(self):
    return [this.task2, this.task3]

def run(self):
    print("Run Task {task}.".format(task=self.__class__.__name__))
    print("Parameters: (param1: {0})".format(self.param01param1))
    # produce Target2 and Target3
    this.task2 = Task2(param1)
    this.task3 = Task3(param1)

def complete(self):
    outputs = flatten(self.output())
    return all(map(lambda output: output.exists(), outputs))

...

Code Block
languagepy
titleThe RunDocker Base Class
class RunDocker(luigi.Task):
    imageIdimage_id = luigi.Parameter(default="")
    imageNameimage_name = luigi.Parameter(default="")
    __dockerContainerdocker_container = None

    @property
    def docker_container(self):
        if self.__dockerContainerdocker_container is None:
            self.__dockerContainerdocker_container = DockerContainer(str(self.imageIdimage_id), str(self.imageNameimage_name))
        return self.__dockerContainerdocker_container

    @property
    def params(self):
        raise NotImplementedError("This method must be overridden")

    def output(self):
        return self.docker_container

    def run(self):
        self.docker_container.params = self.params

        self.docker_container.run()
        # wait until the container wasterminates
terminated         while self.docker_container.is_running:
            time.sleep(1)

    def complete(self):
        # SOLUTION1: do all input targets exist? (force a run of all upstream tasks)
        # PROS: No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi
        # CONS: Run full graph; (Always) overwrites existing entries (if not handled by algorithm); 
        ...

        # SOLUTION2: [CHECK->] check within database if task was already run &
        #            check wherever new* [staging data or properties or predictions] exist within the given time range [time_start, time_end]
        #            * if last_algo_run_timestamp < latest_property_timestamp_with_propertygroup_within_timerange
        # PROS: Early termination of workflow; No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi
        # CONS: Complex implementation (different checks for different pipeline steps)
        ...

        # SOLUTION3: [<-MEMORY] check within database if task was already run AND if its entry is still valide
        # PROS: Early termination of workflow; Simple implementation; Follows the concept of Luigi
        # PLUS: Flag within database can be used for later visualization, etc.
        # CONS: May only affect prediction and validation algorithms (otherwise the resolving of the access_history is req. which adds a lot of complexity)
        ...

The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.

 

 

...

The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.

A problem occurs if we have new or modified data within our database. Let us assume we first run a workflow, afterwards manipulate some data within the staging table and run our workflow again. The second time we run the workflow Luigi will go check from the last task upwards wherever it is complete or not. If this task represents a prediction algorithm its complete() method will return True as the task not yet knows as its input data will change by a preceding task. Luigi would have to traverse all the graph upwards towards the task which finally manipulates the data and provides new input data for all succeed tasks.

Following, we present three solutions to this problem:

SolutionDescriptionAdvantagesDisadvantages
1

[always re-run the workflow]

Check wherever all input targets exists.

As a new instantiated task do not know about its related docker container
its output targets are all inexistent. For that reason, all tasks are incomplete
for Luigi the first place and all tasks will be executed.

We do not check wherever new data are available within the database

It provides a simple implementation

The full graph has to be executed for each run

It does not follow the concept of Luigi

Existing data are always overwritten which makes results,
e.g. predictions, based on the old data invalid

2

[check for invalid runs on the fly - JIT approach]

Check wherever an algorithm was successfully ran, using the algorithm_run
table within the database. Furthermore, check wherever new data exist
within the given time range to proceed [start_time, end_time].

Which data to concern depends on the algorithm type, e.g. feature extraction
or prediction algorithm. As an example for a prediction algorithm, one could
verify wherever the latest creation datetime of a property within a property
group used as input data for the algorithm is older than the timestamp from
the last successful ran of the algorithm stored within the database. If so, the
last run is outdated and we have to re-run the task.

An early termination of the graph is possible

It does follow the concept of Luigi

Comparatively with solution (3) we can check wherever a run is
invalid on the fly, without the need of an invalidation flag

 

It requires a more complex implementation
(e.g. the differentiation of algorithm types and their
input data)
3

[invalidate algorithms while overwriting data - MEMORY approach]

Check wherever an algorithm was successfully run AND is still valid,
using the algorithm_run table within the database.

While overwriting old data within the database evaluate all dependent
algorithm runs and set its state as invalid. This is especially cumbersome
for algorithms which inputs are only given by the access_history table, e.g.
ML algorithms which store their trained parameters as an algorithm
configuration. Hereby, the access_history query has to be re-executed
and verified wherever a newly added property is included.

An early termination of the graph is possible

It does follow the concept of Luigi

Invalid algorithm runs are flagged which may be useful for post-
processing steps, e.g. the visualization of valid predictions

It requires a more complex implementation
(e.g. resolving property groups from the access_history)

We cannot check wherever a data loader algorithm is
invalid or not as we do not have a relation between
data within the staging table and the algorithm which
added/modified the data

As solution 2 and 3 introduces a lot more complexity within our infrastructure we would like to avoid them. Solution 1 however has the serious disadvantage, as it may invalidate early computed results.

We therefore introduce the following renewal:

We provide two datasets, one with actual data and one with data older than 17 days. Hereby, we do NOT allow any manual modification on the first dataset and expect, as master data, e.g. SWPC data, older than 17 days are stable and do not change anymore.

While running a workflow each 6 hours on the actual data we know as only the newest prediction is accurate - all others are out-dated and their input-data cannot be reconstructed. As no modification is allowed we know as the newest prediction is always accurate and no manual modification of the underlying data happened. Concerning the historical data, we do not need to worry about corrupting our data as the master data and hereby the resulting properties will not change anymore.