...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class Task2(luigi.Task): task_namespace = 'examples' param1 = luigi.IntParameter() task2 = None task3 = None def requires(self): yield Task1() def output(self): return [this.task2, this.task3] def run(self): print("Run Task {task}.".format(task=self.__class__.__name__)) print("Parameters: (param1: {0})".format(self.param01param1)) # produce Target2 and Target3 this.task2 = Task2(param1) this.task3 = Task3(param1) def complete(self): outputs = flatten(self.output()) return all(map(lambda output: output.exists(), outputs)) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
class RunDocker(luigi.Task): imageIdimage_id = luigi.Parameter(default="") imageNameimage_name = luigi.Parameter(default="") __dockerContainerdocker_container = None @property def docker_container(self): if self.__dockerContainerdocker_container is None: self.__dockerContainerdocker_container = DockerContainer(str(self.imageIdimage_id), str(self.imageNameimage_name)) return self.__dockerContainerdocker_container @property def params(self): raise NotImplementedError("This method must be overridden") def output(self): return self.docker_container def run(self): self.docker_container.params = self.params self.docker_container.run() # wait until the container wasterminates terminated while self.docker_container.is_running: time.sleep(1) def complete(self): # SOLUTION1: do all input targets exist? (force a run of all upstream tasks) # PROS: No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi # CONS: Run full graph; (Always) overwrites existing entries (if not handled by algorithm); ... # SOLUTION2: [CHECK->] check within database if task was already run & # check wherever new* [staging data or properties or predictions] exist within the given time range [time_start, time_end] # * if last_algo_run_timestamp < latest_property_timestamp_with_propertygroup_within_timerange # PROS: Early termination of workflow; No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi # CONS: Complex implementation (different checks for different pipeline steps) ... # SOLUTION3: [<-MEMORY] check within database if task was already run AND if its entry is still valide # PROS: Early termination of workflow; Simple implementation; Follows the concept of Luigi # PLUS: Flag within database can be used for later visualization, etc. # CONS: May only affect prediction and validation algorithms (otherwise the resolving of the access_history is req. which adds a lot of complexity) ... |
The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.
... |
The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.
A problem occurs if we have new or modified data within our database. Let us assume we first run a workflow, afterwards manipulate some data within the staging table and run our workflow again. The second time we run the workflow Luigi will go check from the last task upwards wherever it is complete or not. If this task represents a prediction algorithm its complete() method will return True as the task not yet knows as its input data will change by a preceding task. Luigi would have to traverse all the graph upwards towards the task which finally manipulates the data and provides new input data for all succeed tasks.
Following, we present three solutions to this problem:
Solution | Description | Advantages | Disadvantages |
---|---|---|---|
1 | [always re-run the workflow] Check wherever all input targets exists. As a new instantiated task do not know about its related docker container | We do not check wherever new data are available within the database It provides a simple implementation | The full graph has to be executed for each run It does not follow the concept of Luigi Existing data are always overwritten which makes results, |
2 | [check for invalid runs on the fly - JIT approach] Check wherever an algorithm was successfully ran, using the algorithm_run Which data to concern depends on the algorithm type, e.g. feature extraction | An early termination of the graph is possible It does follow the concept of Luigi Comparatively with solution (3) we can check wherever a run is
| It requires a more complex implementation (e.g. the differentiation of algorithm types and their input data) |
3 | [invalidate algorithms while overwriting data - MEMORY approach] Check wherever an algorithm was successfully run AND is still valid, While overwriting old data within the database evaluate all dependent | An early termination of the graph is possible It does follow the concept of Luigi Invalid algorithm runs are flagged which may be useful for post- | It requires a more complex implementation We cannot check wherever a data loader algorithm is |
As solution 2 and 3 introduces a lot more complexity within our infrastructure we would like to avoid them. Solution 1 however has the serious disadvantage, as it may invalidate early computed results.
We therefore introduce the following renewal:
We provide two datasets, one with actual data and one with data older than 17 days. Hereby, we do NOT allow any manual modification on the first dataset and expect, as master data, e.g. SWPC data, older than 17 days are stable and do not change anymore.
While running a workflow each 6 hours on the actual data we know as only the newest prediction is accurate - all others are out-dated and their input-data cannot be reconstructed. As no modification is allowed we know as the newest prediction is always accurate and no manual modification of the underlying data happened. Concerning the historical data, we do not need to worry about corrupting our data as the master data and hereby the resulting properties will not change anymore.