Celery:如何为基于 class 的任务创建签名?
Celery: How to create signature for a class-based task?
Python 3.7,芹菜 4.3
我注册了一个基于 class 的任务:
class TaskA(celery_app.Task):
name = 'task_a'
def run(self, my_param):
pass
celery_app.tasks.register(TaskA())
我想创建一个任务链,其中 TaskA 应该是链接之一。
这是我的做法:
my_param = "some value"
result = celery.chain(
TaskA.s(my_param),
# ...
)()
我收到错误:
TypeError: s() missing 1 required positional argument: 'self'
替换代码:
my_param = "some value"
result = celery.chain(
TaskA.s(my_param),
# ...
)()
与:
my_param = "some value"
task_ = TaskA() # <------
result = celery.chain(
task_.s(my_param),
# ...
)()
Python 3.7,芹菜 4.3
我注册了一个基于 class 的任务:
class TaskA(celery_app.Task):
name = 'task_a'
def run(self, my_param):
pass
celery_app.tasks.register(TaskA())
我想创建一个任务链,其中 TaskA 应该是链接之一。
这是我的做法:
my_param = "some value"
result = celery.chain(
TaskA.s(my_param),
# ...
)()
我收到错误:
TypeError: s() missing 1 required positional argument: 'self'
替换代码:
my_param = "some value"
result = celery.chain(
TaskA.s(my_param),
# ...
)()
与:
my_param = "some value"
task_ = TaskA() # <------
result = celery.chain(
task_.s(my_param),
# ...
)()