Celery 运行 组中的任务链
Celery running chain of tasks in group
我正在尝试 运行 Celery 作为任务管理器,但在 运行 在一个组中执行多个任务时遇到了问题。组内所有任务完成后,我想收集结果。如果组中只有 1 个任务,工作流工作正常,它等待所有任务完成。但是,如果组中有 2 个或更多任务,或者我没有正确 运行 配置它,它就会失败。下面是代码示例
@celery2.task(name='square')
def square(a):
log.info(f'In square group {a}')
return a**a
@celery2.task(name='add_one')
def add_one(a):
b = a+1
return b
@celery2.task(name='add_one_and_square')
def add_one_and_square(a):
return (add_one.s(a) | square.s())
@celery2.task(name='collect')
def collect(a):
return a
@celery2.task(name='group-task')
def group_square(num):
return group([(add_one_and_square(i)) for i in range(num)])
运行 芹菜工作流程:
res = (add.s(2,3) | group_square.s()|collect.s())
res.apply_async()
下面是从输出中捕获的数据,我看到创建了签名,不确定这是否是正确的方法或如何 运行 单个组中的任务链,因此它具有类似于单个的行为小组任务。
{'task': 'celery.group',
'args': [],
'kwargs': {'tasks': [{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [0],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [1],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [2],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [3],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [4],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'group',
'immutable': False,
'chord_size': None}
如有任何见解,我将不胜感激。谢谢!
我认为在另一个 celery 任务中 运行ning celery 任务是一种不好的做法,在某些情况下可能会导致死锁(我认为它也在文档中的某个地方)。如果你想这样做 - 运行 aync 可能更安全。
在您的方案中,我建议在 group_square
任务中添加对方付费电话。类似于:
@celery2.task(name='group-task')
def group_square(num):
canvas_flow = group([(add_one_and_square.si(i)) for i in range(num)]) | collect.s()
return canvas_flow.apply_async()
现在 group_square 的结果将是 ResultAsync
类似的结果。您可以随时检查 .ready()
然后得到 .result()
.
我正在尝试 运行 Celery 作为任务管理器,但在 运行 在一个组中执行多个任务时遇到了问题。组内所有任务完成后,我想收集结果。如果组中只有 1 个任务,工作流工作正常,它等待所有任务完成。但是,如果组中有 2 个或更多任务,或者我没有正确 运行 配置它,它就会失败。下面是代码示例
@celery2.task(name='square')
def square(a):
log.info(f'In square group {a}')
return a**a
@celery2.task(name='add_one')
def add_one(a):
b = a+1
return b
@celery2.task(name='add_one_and_square')
def add_one_and_square(a):
return (add_one.s(a) | square.s())
@celery2.task(name='collect')
def collect(a):
return a
@celery2.task(name='group-task')
def group_square(num):
return group([(add_one_and_square(i)) for i in range(num)])
运行 芹菜工作流程:
res = (add.s(2,3) | group_square.s()|collect.s())
res.apply_async()
下面是从输出中捕获的数据,我看到创建了签名,不确定这是否是正确的方法或如何 运行 单个组中的任务链,因此它具有类似于单个的行为小组任务。
{'task': 'celery.group',
'args': [],
'kwargs': {'tasks': [{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [0],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [1],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [2],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [3],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [4],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'group',
'immutable': False,
'chord_size': None}
如有任何见解,我将不胜感激。谢谢!
我认为在另一个 celery 任务中 运行ning celery 任务是一种不好的做法,在某些情况下可能会导致死锁(我认为它也在文档中的某个地方)。如果你想这样做 - 运行 aync 可能更安全。
在您的方案中,我建议在 group_square
任务中添加对方付费电话。类似于:
@celery2.task(name='group-task')
def group_square(num):
canvas_flow = group([(add_one_and_square.si(i)) for i in range(num)]) | collect.s()
return canvas_flow.apply_async()
现在 group_square 的结果将是 ResultAsync
类似的结果。您可以随时检查 .ready()
然后得到 .result()
.