如何获取预定芹菜任务的任务对象?

How to get Task objects for scheduled celery tasks?

此题类似于Retrieve list of tasks in a queue in Celery, but I'd like to obtain the actual Task objects (cf. http://docs.celeryproject.org/en/latest/reference/celery.events.state.html#celery.events.state.Task)而不是字典表示。

当我做的时候

from celery.task.control import inspect
i = inspect()

然后从 shell 执行 i.scheduled(),我得到的结果类似于

In [75]: i.scheduled()
Out[75]: 
{'celery@Kurts-MacBook-Pro-3.local': [{'eta': '2019-08-01T01:31:37.843141+00:00',
   'priority': 6,
   'request': {'acknowledged': False,
    'args': "[126, 'Business Signup', {'actualCompanyName': 'Gilmore and Beck LLC', 'gigsEnabledRegionMapping': True, 'companyName': 'Gilmore and Beck LLC', 'companyRegionMapping': 'Lesliechester', 'companyId': 'ow9DMA8'}]",
    'delivery_info': {'exchange': '',
     'priority': 0,
     'redelivered': None,
     'routing_key': 'celery'},
    'hostname': 'celery@Kurts-MacBook-Pro-3.local',
    'id': '4ecdc400-8421-4a06-babc-98493362ec67',
    'kwargs': '{}',
    'name': 'backend.tasks.task_send_event_to_iterable',
    'time_start': None,
    'type': 'backend.tasks.task_send_event_to_iterable',
    'worker_pid': None}},
  {'eta': '2019-08-01T01:39:21.205879+00:00',
   'priority': 6,
   'request': {'acknowledged': False,
    'args': "('hyjomuz@mailinator.net',)",
    'delivery_info': {'exchange': '',
     'priority': 0,
     'redelivered': None,
     'routing_key': 'celery'},
    'hostname': 'celery@Kurts-MacBook-Pro-3.local',
    'id': '294910a3-2323-4fcf-9768-115c1a8c5e06',
    'kwargs': '{}',
    'name': 'backend.tasks.task_send_business_lead_notification',
    'time_start': None,
    'type': 'backend.tasks.task_send_business_lead_notification',
    'worker_pid': None}}]}

我想搜索一下这些任务并有条件地撤销一个。但是,我想遍历结果并让实际任务触手可及,就像 How to inspect and cancel Celery tasks by task name 中的示例一样。但是如果我尝试做 celery.events.state.State(),我没有得到任何事件:

In [76]: celery.events.state.State()
Out[76]: <State: events=0 tasks=0>

是否可以获取计划任务的实际Task对象?特别是,我有兴趣获得 args 而不必解析或尝试 ast.literal_eval().

我不确定您能否真正获得实际的 Task 实例,但您可以通过简单地使用要检查的任务 ID 对其进行实例化来轻松创建 AsyncResult,并且您需要(自然地)通过Celery 应用程序也反对它。

一些伪代码:

from celery.result import AsyncResult
from my.project.celeryapp import myapp

task_res = AsyncResult("9ed888fe-f6b6-4443-85d3-787c5c1b26b0", app=myapp)
print(task_res.state)