...
Code Block | ||||
---|---|---|---|---|
| ||||
class RunDocker(luigi.Task): imageId = luigi.Parameter(default="") imageName = luigi.Parameter(default="") __dockerContainer = None @property def docker_container(self): if self.__dockerContainer is None: self.__dockerContainer = DockerContainer(str(self.imageId), str(self.imageName)) return self.__dockerContainer @property def params(self): raise NotImplementedError("This method must be overridden") def output(self): return self.docker_container def run(self): self.docker_container.params = self.params self.docker_container.run() # wait until the container was terminated while self.docker_container.is_running: time.sleep(1) def complete(self): # SOLUTION1: do all input targets exist? (force a run of all upstream tasks) # PROS: No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi # CONS: Run full graph; (Always) overwrites existing entries (if not handled by algorithm); ... # SOLUTION2: [CHECK->] check within database if task was already run & check wherever new* [staging data or properties or predictions] exist within the given time range [time_start, time_end] * if last_algo_run_timestamp < latest_property_timestamp_with_propertygroup_within_timerange # PROS: Early termination of workflow; No need of checking wherever a task is still valid; Simple implementation; Follows the concept of Luigi # CONS: Complex implementation (different checks for different pipeline steps) ... # SOLUTION3: [<-MEMORY] check within database if task was already run AND if its entry is still valide # PROS: Early termination of workflow; Simple implementation; Follows the concept of Luigi # PLUS: Flag within database can be used for later visualization, etc. # CONS: May only affect prediction and validation algorithms (otherwise the resolving of the access_history is req. which adds a lot of complexity) ... |
The most important part of the RunDocker class is the complete() method. This method decides wherever a Task has to be run or if Luigi can skip the task and ALL its dependencies.