如何使用 Celery 顺序执行独立任务?
How to execute independent tasks sequentially using Celery?
我必须并行安排一些对 运行 来说似乎非常复杂的任务。它们不依赖于彼此的结果,函数需要 3 个参数。
我已经尝试过使用链、地图和星图方法。使用链我得到这个错误:
[2019-04-23 15:28:00,991: ERROR/PoolWorker-3] Task proj.apps.tasks.generate[112a7426-5ac3-4cd6-8416-5591c3c018a3] raised unexpected: TypeError('get expected at least 1 arguments, got 0',)
Traceback (most recent call last):
File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 367, in trace_task
R = retval = fun(*args, **kwargs)
File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 622, in __protected_call__
return self.run(*args, **kwargs)
File ".../tasks.py", line 966, in generate
return res.get()
TypeError: get expected at least 1 arguments, got 0
使用 map
我无法传递所有参数,使用 starmap
所有任务都同时启动。
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
任务示例:
@shared_task
def generate(field1, field2, field3=None):
if field3 is not None:
return field1 + field2 + field3
return field1 + field2
使用链的代码:
res = chain(generate.s(i, 5, j) for i in array1 for j in array2)
return res.get()
使用星图的代码:
arguments = [(i, 4, j) for i in array1 for j in array2]
~generate.starmap(arguments)
如果任务真正独立,您应该使用 .si
而不是 .s
:
tasks = chain(generate.si(i, 5, j) for i in array1 for j in array2)
res = tasks()
return res.get()
我需要做的就是制作一条链,如下所示:
res = chain(generate(i, 2, j)for i in array1 for j in array2)()
return res.get()
然后是 运行 芹菜,带有一个设置最大线程数的附加参数
celery -A tasks worker --concurrency=1
我必须并行安排一些对 运行 来说似乎非常复杂的任务。它们不依赖于彼此的结果,函数需要 3 个参数。
我已经尝试过使用链、地图和星图方法。使用链我得到这个错误:
[2019-04-23 15:28:00,991: ERROR/PoolWorker-3] Task proj.apps.tasks.generate[112a7426-5ac3-4cd6-8416-5591c3c018a3] raised unexpected: TypeError('get expected at least 1 arguments, got 0',)
Traceback (most recent call last):
File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 367, in trace_task
R = retval = fun(*args, **kwargs)
File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 622, in __protected_call__
return self.run(*args, **kwargs)
File ".../tasks.py", line 966, in generate
return res.get()
TypeError: get expected at least 1 arguments, got 0
使用 map
我无法传递所有参数,使用 starmap
所有任务都同时启动。
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
任务示例:
@shared_task
def generate(field1, field2, field3=None):
if field3 is not None:
return field1 + field2 + field3
return field1 + field2
使用链的代码:
res = chain(generate.s(i, 5, j) for i in array1 for j in array2)
return res.get()
使用星图的代码:
arguments = [(i, 4, j) for i in array1 for j in array2]
~generate.starmap(arguments)
如果任务真正独立,您应该使用 .si
而不是 .s
:
tasks = chain(generate.si(i, 5, j) for i in array1 for j in array2)
res = tasks()
return res.get()
我需要做的就是制作一条链,如下所示:
res = chain(generate(i, 2, j)for i in array1 for j in array2)()
return res.get()
然后是 运行 芹菜,带有一个设置最大线程数的附加参数
celery -A tasks worker --concurrency=1