如何在 Celery 中创建条件子任务链?

How can I create a chain of conditional subtasks in Celery?

我正在创建一个应用程序,该应用程序将创建要执行的任务链,但该链将根据用户对添加该部分的愿望执行任务。

例如,如果用户想要 start_boo 链可能是:

def start_boo():
    chain = start_foo.s() | start_bar.s() | start_baz.s()
    chain()

但是,如果 foobaz 已经开始,我们就不想这样做;而不是喜欢这样的东西:

def start_boo(foo=True, bar=True, baz=True):
    if not (foo or bar or baz):
        raise Exception("At least one should be true...")
    chain = None
    if foo:
       chain |= start_foo.s()
    if bar:
        chain |= start_bar.s()
    if baz:
        chain |= start_baz.s()
    chain()

start_boo(foo=False, baz=False)

但是,由于各种原因,这行不通。

有这样的成语吗?

成语是 functools 中的 reduce 函数。您可以执行以下操作:

def start_boo(foo=True, bar=True, baz=True):
    if not (foo or bar or baz):
        raise Exception("At least one should be true...")

    todo_tasks = [foo, bar, baz]
    start_tasks = [start_foo, start_bar, start_baz]

    # tasks contains start tasks which should be done per the options.
    # if it's False in todo_tasks, its associated start_task isn't added
    tasks = [start_task for todo, start_task in zip(todo_tasks, start_tasks) if todo]
    first_task, rest = *tasks  

    # start with the first task to be completed and chain it with remaining tasks
    chain = functools.reduce(lambda x, y: x | y.s(), rest, first_task.s())
    chain()