Python 任务调度器 Luigi 可以检测间接依赖吗?

Can Python task scheduler Luigi detect indirect dependencies?

短版:

Python 中是否有可以执行 gmake 功能的任务调度程序?特别是,我需要一个递归解决依赖关系的任务调度程序。我调查了 Luigi,但它似乎只能解决直接依赖关系。

长版:

我正在尝试构建一个工作流程,以预定义的顺序处理大量数据文件,后面的任务可能直接依赖于一些较早任务的输出,但反过来,这些输出的正确性依赖于更简单的任务。

例如,让我们考虑如下依赖图:

A <- B <- C

当我请求任务C的结果时,Luigi会自动调度B,然后由于B依赖A,它会调度A。所以最后的运行顺序将是[A,B,C ].每个任务都会创建一个官方输出文件作为成功执行的标志。这对第一个 运行 没问题。

现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要重新运行 整个链。但是,简单地从 A 中删除输出文件是行不通的。因为Luigi看到了B和C的输出,断定Task C的要求已经完成,不需要运行了。我必须从 ALL 依赖于 A 的任务中删除输出文件,以便它们再次成为 运行。在简单的情况下,我必须删除 A、B 和 C 的所有输出文件,以便 Luigi 检测到对 A 所做的更改。

这是一个非常不方便的功能。如果我有数十个或数百个相互之间具有相当复杂依赖关系的任务,那么当其中一个任务需要重新 运行 时,很难判断哪些任务受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 那样工作,递归地检查依赖关系,并且当最深的源文件之一发生更改时将重建最终目标。

我想知道是否有人可以就此问题提供一些建议。我是否遗漏了 Luigi 中的一些关键功能?还有其他任务调度程序可以充当 gmake 吗?我对基于 Python 的软件包特别感兴趣,并且更喜欢那些支持 Windows.

的软件包

非常感谢!

似乎可以通过覆盖任务的完整方法来实现。您必须在依赖关系图中一直应用它。

def complete(self):
    outputs = self.flatten(self.output())
    if not all(map(lambda output: output.exists(), outputs)):
        return False
    for task in self.flatten(self.requires()):
        if not task.complete():
            for output in outputs:
                if output.exists():
                    output.remove()
            return False
    return True

确实这很不方便,d6tflow 检查所有上游依赖项的完整性,而不仅仅是 TaskC 的输出存在。如果重置TaskA,TaskC也将不完整并自动重新运行。

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC()) # all tasks pending

有关详细信息,请参阅下面的完整示例和 d6tflow docs

import d6tflow
import pandas as pd

class TaskA(d6tflow.tasks.TaskCachePandas):  # save dataframe in memory

    def run(self):        
        self.save(pd.DataFrame({'a':range(10)})) # quickly save dataframe

class TaskB(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskA() # define dependency

    def run(self):
        df = self.input().load() # quickly load required data
        df = df*2
        self.save(df)

class TaskC(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskB()

    def run(self):
        df = self.input().load() 
        df = df*2
        self.save(df)

# Check task dependencies and their execution status
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
   └─--[TaskB-{} (PENDING)]
      └─--[TaskA-{} (PENDING)]
'''

# Execute the model training task including dependencies
d6tflow.run(TaskC())

'''
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 TaskC()
'''

# all tasks complete
d6tflow.preview(TaskC())

'''
└─--[TaskC-{} (COMPLETE)]
   └─--[TaskB-{} (COMPLETE)]
      └─--[TaskA-{} (COMPLETE)]
'''

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
   └─--[TaskB-{} (PENDING)]
      └─--[TaskA-{} (PENDING)]
'''