Celery 和 RabbitMQ 我没有收到任务的更新

Celery and RabbitMQ I am not receiving updates for my task

你好吗?我正在研究 django 2.2、celery 4.4.2 和 在我的 tasks.py 文件中,我有以下代码

@shared_task
def start_task(dict):
    objectLogic = LogicProcess()
    # print('dict', dict)
    task = objectLogic.RunProcess(parameters=dict)
    print("Estado de la tarea: {}\r".format(task))
    return task

它调用了一个函数,在它结束时我更新了它,但我没有收到响应。

current_task.update_state(
                state = 'PROGRESS',
                meta = {
                    'current': oer_number,
                    'total': TotalOER,
                    'percent': int((oer_number * 100) / TotalOER)
                }
            )

啊是的...在任务结束时我完成了return

return {'current': TotalOER, 'total': TotalOER, 'percent': 100}

好吧,如果我得到一个(第一次更新)我就看不到其他人了

celery -A ost worker -l info -n worker
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status     {}
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status     {'state': 'PROGRESS', 'result': {'current': 2, 'total': 66, 'percent': 3}}

task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 2
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 74
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 110

或者只是它在工作时调用的那个,只是状态不 return 我

在两个文件 tasks.pyfunction.py import

from celery import shared_task, current_task

celery.py 文件包含

# from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ost.settings')

app = Celery('ost', backend='amqp')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

在 django 看来我是这样称呼它的

from celery.result import AsyncResult

task = start_task.AsyncResult(task_id)
            data = {}
            if isinstance(task.result, dict):
                data['state'] = task.state
                data['result'] = task.result

我用 if task.ready() 试过了,但它不满足条件,尽管正如我已经提到的,如果它结束函数并且 returns

无论如何,我不知道还有什么要指出的,任何想法都会被测试,有些东西我会回顾link 1, link 2, link 2

PS:我在 for 循环中更新任务状态

正如我提到的,更新是在一个 FOR 循环中进行的,为此我在找到 AsyncResult 的地方创建了一个 Recursive functions,在我的例子中是 django 视图。这将检索任务的状态。当 oer_number = TotalOER.

时,我结束它 (task.state = SUCCESS)

从透视角度看什么是合乎逻辑的,更新 FOR 循环 恢复 recursive functions嗯,一个人忘记的事情。所以它只恢复一个值