Python 任务调度器 Luigi 可以检测间接依赖吗?
Can Python task scheduler Luigi detect indirect dependencies?
短版:
Python 中是否有可以执行 gmake 功能的任务调度程序?特别是,我需要一个递归解决依赖关系的任务调度程序。我调查了 Luigi,但它似乎只能解决直接依赖关系。
长版:
我正在尝试构建一个工作流程,以预定义的顺序处理大量数据文件,后面的任务可能直接依赖于一些较早任务的输出,但反过来,这些输出的正确性依赖于更简单的任务。
例如,让我们考虑如下依赖图:
A <- B <- C
当我请求任务C的结果时,Luigi会自动调度B,然后由于B依赖A,它会调度A。所以最后的运行顺序将是[A,B,C ].每个任务都会创建一个官方输出文件作为成功执行的标志。这对第一个 运行 没问题。
现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要重新运行 整个链。但是,简单地从 A 中删除输出文件是行不通的。因为Luigi看到了B和C的输出,断定Task C的要求已经完成,不需要运行了。我必须从 ALL 依赖于 A 的任务中删除输出文件,以便它们再次成为 运行。在简单的情况下,我必须删除 A、B 和 C 的所有输出文件,以便 Luigi 检测到对 A 所做的更改。
这是一个非常不方便的功能。如果我有数十个或数百个相互之间具有相当复杂依赖关系的任务,那么当其中一个任务需要重新 运行 时,很难判断哪些任务受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 那样工作,递归地检查依赖关系,并且当最深的源文件之一发生更改时将重建最终目标。
我想知道是否有人可以就此问题提供一些建议。我是否遗漏了 Luigi 中的一些关键功能?还有其他任务调度程序可以充当 gmake 吗?我对基于 Python 的软件包特别感兴趣,并且更喜欢那些支持 Windows.
的软件包
非常感谢!
似乎可以通过覆盖任务的完整方法来实现。您必须在依赖关系图中一直应用它。
def complete(self):
outputs = self.flatten(self.output())
if not all(map(lambda output: output.exists(), outputs)):
return False
for task in self.flatten(self.requires()):
if not task.complete():
for output in outputs:
if output.exists():
output.remove()
return False
return True
确实这很不方便,d6tflow 检查所有上游依赖项的完整性,而不仅仅是 TaskC 的输出存在。如果重置TaskA,TaskC也将不完整并自动重新运行。
# reset TaskA => makes TaskC incomplete
TaskA().invalidate()
d6tflow.preview(TaskC()) # all tasks pending
有关详细信息,请参阅下面的完整示例和 d6tflow docs。
import d6tflow
import pandas as pd
class TaskA(d6tflow.tasks.TaskCachePandas): # save dataframe in memory
def run(self):
self.save(pd.DataFrame({'a':range(10)})) # quickly save dataframe
class TaskB(d6tflow.tasks.TaskCachePandas):
def requires(self):
return TaskA() # define dependency
def run(self):
df = self.input().load() # quickly load required data
df = df*2
self.save(df)
class TaskC(d6tflow.tasks.TaskCachePandas):
def requires(self):
return TaskB()
def run(self):
df = self.input().load()
df = df*2
self.save(df)
# Check task dependencies and their execution status
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
└─--[TaskB-{} (PENDING)]
└─--[TaskA-{} (PENDING)]
'''
# Execute the model training task including dependencies
d6tflow.run(TaskC())
'''
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 TaskA()
- 1 TaskB()
- 1 TaskC()
'''
# all tasks complete
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (COMPLETE)]
└─--[TaskB-{} (COMPLETE)]
└─--[TaskA-{} (COMPLETE)]
'''
# reset TaskA => makes TaskC incomplete
TaskA().invalidate()
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
└─--[TaskB-{} (PENDING)]
└─--[TaskA-{} (PENDING)]
'''
短版:
Python 中是否有可以执行 gmake 功能的任务调度程序?特别是,我需要一个递归解决依赖关系的任务调度程序。我调查了 Luigi,但它似乎只能解决直接依赖关系。
长版:
我正在尝试构建一个工作流程,以预定义的顺序处理大量数据文件,后面的任务可能直接依赖于一些较早任务的输出,但反过来,这些输出的正确性依赖于更简单的任务。
例如,让我们考虑如下依赖图:
A <- B <- C
当我请求任务C的结果时,Luigi会自动调度B,然后由于B依赖A,它会调度A。所以最后的运行顺序将是[A,B,C ].每个任务都会创建一个官方输出文件作为成功执行的标志。这对第一个 运行 没问题。
现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要重新运行 整个链。但是,简单地从 A 中删除输出文件是行不通的。因为Luigi看到了B和C的输出,断定Task C的要求已经完成,不需要运行了。我必须从 ALL 依赖于 A 的任务中删除输出文件,以便它们再次成为 运行。在简单的情况下,我必须删除 A、B 和 C 的所有输出文件,以便 Luigi 检测到对 A 所做的更改。
这是一个非常不方便的功能。如果我有数十个或数百个相互之间具有相当复杂依赖关系的任务,那么当其中一个任务需要重新 运行 时,很难判断哪些任务受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 那样工作,递归地检查依赖关系,并且当最深的源文件之一发生更改时将重建最终目标。
我想知道是否有人可以就此问题提供一些建议。我是否遗漏了 Luigi 中的一些关键功能?还有其他任务调度程序可以充当 gmake 吗?我对基于 Python 的软件包特别感兴趣,并且更喜欢那些支持 Windows.
的软件包非常感谢!
似乎可以通过覆盖任务的完整方法来实现。您必须在依赖关系图中一直应用它。
def complete(self):
outputs = self.flatten(self.output())
if not all(map(lambda output: output.exists(), outputs)):
return False
for task in self.flatten(self.requires()):
if not task.complete():
for output in outputs:
if output.exists():
output.remove()
return False
return True
确实这很不方便,d6tflow 检查所有上游依赖项的完整性,而不仅仅是 TaskC 的输出存在。如果重置TaskA,TaskC也将不完整并自动重新运行。
# reset TaskA => makes TaskC incomplete
TaskA().invalidate()
d6tflow.preview(TaskC()) # all tasks pending
有关详细信息,请参阅下面的完整示例和 d6tflow docs。
import d6tflow
import pandas as pd
class TaskA(d6tflow.tasks.TaskCachePandas): # save dataframe in memory
def run(self):
self.save(pd.DataFrame({'a':range(10)})) # quickly save dataframe
class TaskB(d6tflow.tasks.TaskCachePandas):
def requires(self):
return TaskA() # define dependency
def run(self):
df = self.input().load() # quickly load required data
df = df*2
self.save(df)
class TaskC(d6tflow.tasks.TaskCachePandas):
def requires(self):
return TaskB()
def run(self):
df = self.input().load()
df = df*2
self.save(df)
# Check task dependencies and their execution status
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
└─--[TaskB-{} (PENDING)]
└─--[TaskA-{} (PENDING)]
'''
# Execute the model training task including dependencies
d6tflow.run(TaskC())
'''
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 TaskA()
- 1 TaskB()
- 1 TaskC()
'''
# all tasks complete
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (COMPLETE)]
└─--[TaskB-{} (COMPLETE)]
└─--[TaskA-{} (COMPLETE)]
'''
# reset TaskA => makes TaskC incomplete
TaskA().invalidate()
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
└─--[TaskB-{} (PENDING)]
└─--[TaskA-{} (PENDING)]
'''