celery_tasktree 一般不支持 DAG 工作流。有其他选择吗?
celery_tasktree does not support DAG workflow in general. Is there an alternative?
celery_tasktree (https://pypi.python.org/pypi/celery-tasktree) provides a cleaner workflow canvas compared to celery workflow scheduler (http://docs.celeryproject.org/en/latest/userguide/canvas.html)。但是,它只支持树状工作流结构,不支持一般的 DAG 式工作流。 Celery 工作流程确实有 "chords" 方法,但使用起来似乎很麻烦。
是否有任何其他基于 celery 的库类似于 celery_tasktree 适用于一般 DAG 工作流程?
这里有几个库支持基于 DAG 的作业调度程序。
https://github.com/thieman/dagobah
https://github.com/apache/incubator-airflow
它们不是基于芹菜。但是,您可以在 celery 中创建自己的基元来转发可用于构建 DAG 作业调度程序的结果。
@app.task(bind=True)
def forward(self, result, sig):
# convert JSON serialized signature back to Signature
sig = self.app.signature(sig)
# get the return value of the provided task signature
result2 = sig()
# next task will receive tuple of (result_A, result_B)
return (result, result2)
@app.task
def C(self, Ares_Bres)):
Ares, Bres = Ares__Bres
return Ares + Bres
workflow = (A.s() | forward.s(B.s()) | C.s())
See here 进行更详细的讨论。
celery_tasktree (https://pypi.python.org/pypi/celery-tasktree) provides a cleaner workflow canvas compared to celery workflow scheduler (http://docs.celeryproject.org/en/latest/userguide/canvas.html)。但是,它只支持树状工作流结构,不支持一般的 DAG 式工作流。 Celery 工作流程确实有 "chords" 方法,但使用起来似乎很麻烦。
是否有任何其他基于 celery 的库类似于 celery_tasktree 适用于一般 DAG 工作流程?
这里有几个库支持基于 DAG 的作业调度程序。
https://github.com/thieman/dagobah
https://github.com/apache/incubator-airflow
它们不是基于芹菜。但是,您可以在 celery 中创建自己的基元来转发可用于构建 DAG 作业调度程序的结果。
@app.task(bind=True)
def forward(self, result, sig):
# convert JSON serialized signature back to Signature
sig = self.app.signature(sig)
# get the return value of the provided task signature
result2 = sig()
# next task will receive tuple of (result_A, result_B)
return (result, result2)
@app.task
def C(self, Ares_Bres)):
Ares, Bres = Ares__Bres
return Ares + Bres
workflow = (A.s() | forward.s(B.s()) | C.s())
See here 进行更详细的讨论。