如何在 Celery 中创建条件子任务链?
How can I create a chain of conditional subtasks in Celery?
我正在创建一个应用程序,该应用程序将创建要执行的任务链,但该链将根据用户对添加该部分的愿望执行任务。
例如,如果用户想要 start_boo
链可能是:
def start_boo():
chain = start_foo.s() | start_bar.s() | start_baz.s()
chain()
但是,如果 foo
和 baz
已经开始,我们就不想这样做;而不是喜欢这样的东西:
def start_boo(foo=True, bar=True, baz=True):
if not (foo or bar or baz):
raise Exception("At least one should be true...")
chain = None
if foo:
chain |= start_foo.s()
if bar:
chain |= start_bar.s()
if baz:
chain |= start_baz.s()
chain()
start_boo(foo=False, baz=False)
但是,由于各种原因,这行不通。
有这样的成语吗?
成语是 functools
中的 reduce
函数。您可以执行以下操作:
def start_boo(foo=True, bar=True, baz=True):
if not (foo or bar or baz):
raise Exception("At least one should be true...")
todo_tasks = [foo, bar, baz]
start_tasks = [start_foo, start_bar, start_baz]
# tasks contains start tasks which should be done per the options.
# if it's False in todo_tasks, its associated start_task isn't added
tasks = [start_task for todo, start_task in zip(todo_tasks, start_tasks) if todo]
first_task, rest = *tasks
# start with the first task to be completed and chain it with remaining tasks
chain = functools.reduce(lambda x, y: x | y.s(), rest, first_task.s())
chain()
我正在创建一个应用程序,该应用程序将创建要执行的任务链,但该链将根据用户对添加该部分的愿望执行任务。
例如,如果用户想要 start_boo
链可能是:
def start_boo():
chain = start_foo.s() | start_bar.s() | start_baz.s()
chain()
但是,如果 foo
和 baz
已经开始,我们就不想这样做;而不是喜欢这样的东西:
def start_boo(foo=True, bar=True, baz=True):
if not (foo or bar or baz):
raise Exception("At least one should be true...")
chain = None
if foo:
chain |= start_foo.s()
if bar:
chain |= start_bar.s()
if baz:
chain |= start_baz.s()
chain()
start_boo(foo=False, baz=False)
但是,由于各种原因,这行不通。
有这样的成语吗?
成语是 functools
中的 reduce
函数。您可以执行以下操作:
def start_boo(foo=True, bar=True, baz=True):
if not (foo or bar or baz):
raise Exception("At least one should be true...")
todo_tasks = [foo, bar, baz]
start_tasks = [start_foo, start_bar, start_baz]
# tasks contains start tasks which should be done per the options.
# if it's False in todo_tasks, its associated start_task isn't added
tasks = [start_task for todo, start_task in zip(todo_tasks, start_tasks) if todo]
first_task, rest = *tasks
# start with the first task to be completed and chain it with remaining tasks
chain = functools.reduce(lambda x, y: x | y.s(), rest, first_task.s())
chain()