celery_tasktree 一般不支持 DAG 工作流。有其他选择吗?

celery_tasktree does not support DAG workflow in general. Is there an alternative?

celery_tasktree (https://pypi.python.org/pypi/celery-tasktree) provides a cleaner workflow canvas compared to celery workflow scheduler (http://docs.celeryproject.org/en/latest/userguide/canvas.html)。但是,它只支持树状工作流结构,不支持一般的 DAG 式工作流。 Celery 工作流程确实有 "chords" 方法,但使用起来似乎很麻烦。

是否有任何其他基于 celery 的库类似于 celery_tasktree 适用于一般 DAG 工作流程?

这里有几个库支持基于 DAG 的作业调度程序。

https://github.com/thieman/dagobah

https://github.com/apache/incubator-airflow

它们不是基于芹菜。但是,您可以在 celery 中创建自己的基元来转发可用于构建 DAG 作业调度程序的结果。

@app.task(bind=True)
def forward(self, result, sig):
    # convert JSON serialized signature back to Signature
    sig = self.app.signature(sig)
    # get the return value of the provided task signature
    result2 = sig()
    # next task will receive tuple of (result_A, result_B)
    return (result, result2)


@app.task
def C(self, Ares_Bres)):
    Ares, Bres = Ares__Bres
    return Ares + Bres

workflow = (A.s() | forward.s(B.s()) | C.s())

See here 进行更详细的讨论。