Celery 和 RabbitMQ 我没有收到任务的更新
Celery and RabbitMQ I am not receiving updates for my task
你好吗?我正在研究 django 2.2、celery 4.4.2 和
在我的 tasks.py 文件中,我有以下代码
@shared_task
def start_task(dict):
objectLogic = LogicProcess()
# print('dict', dict)
task = objectLogic.RunProcess(parameters=dict)
print("Estado de la tarea: {}\r".format(task))
return task
它调用了一个函数,在它结束时我更新了它,但我没有收到响应。
current_task.update_state(
state = 'PROGRESS',
meta = {
'current': oer_number,
'total': TotalOER,
'percent': int((oer_number * 100) / TotalOER)
}
)
啊是的...在任务结束时我完成了return
return {'current': TotalOER, 'total': TotalOER, 'percent': 100}
好吧,如果我得到一个(第一次更新)我就看不到其他人了
celery -A ost worker -l info -n worker
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status {}
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status {'state': 'PROGRESS', 'result': {'current': 2, 'total': 66, 'percent': 3}}
或
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 2
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 74
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 110
或者只是它在工作时调用的那个,只是状态不 return 我
在两个文件 tasks.py 和 function.py import
from celery import shared_task, current_task
celery.py 文件包含
# from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ost.settings')
app = Celery('ost', backend='amqp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
在 django 看来我是这样称呼它的
from celery.result import AsyncResult
task = start_task.AsyncResult(task_id)
data = {}
if isinstance(task.result, dict):
data['state'] = task.state
data['result'] = task.result
我用 if task.ready()
试过了,但它不满足条件,尽管正如我已经提到的,如果它结束函数并且 returns
无论如何,我不知道还有什么要指出的,任何想法都会被测试,有些东西我会回顾link 1, link 2, link 2
PS:我在 for 循环中更新任务状态
正如我提到的,更新是在一个 FOR 循环中进行的,为此我在找到 AsyncResult
的地方创建了一个 Recursive functions
,在我的例子中是 django 视图。这将检索任务的状态。当 oer_number = TotalOER.
时,我结束它 (task.state = SUCCESS
)
从透视角度看什么是合乎逻辑的,更新 FOR
循环 恢复 recursive functions
嗯,一个人忘记的事情。所以它只恢复一个值
你好吗?我正在研究 django 2.2、celery 4.4.2 和 在我的 tasks.py 文件中,我有以下代码
@shared_task
def start_task(dict):
objectLogic = LogicProcess()
# print('dict', dict)
task = objectLogic.RunProcess(parameters=dict)
print("Estado de la tarea: {}\r".format(task))
return task
它调用了一个函数,在它结束时我更新了它,但我没有收到响应。
current_task.update_state(
state = 'PROGRESS',
meta = {
'current': oer_number,
'total': TotalOER,
'percent': int((oer_number * 100) / TotalOER)
}
)
啊是的...在任务结束时我完成了return
return {'current': TotalOER, 'total': TotalOER, 'percent': 100}
好吧,如果我得到一个(第一次更新)我就看不到其他人了
celery -A ost worker -l info -n worker
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status {}
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status {'state': 'PROGRESS', 'result': {'current': 2, 'total': 66, 'percent': 3}}
或
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 2
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 74
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 110
或者只是它在工作时调用的那个,只是状态不 return 我
在两个文件 tasks.py 和 function.py import
from celery import shared_task, current_task
celery.py 文件包含
# from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ost.settings')
app = Celery('ost', backend='amqp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
在 django 看来我是这样称呼它的
from celery.result import AsyncResult
task = start_task.AsyncResult(task_id)
data = {}
if isinstance(task.result, dict):
data['state'] = task.state
data['result'] = task.result
我用 if task.ready()
试过了,但它不满足条件,尽管正如我已经提到的,如果它结束函数并且 returns
无论如何,我不知道还有什么要指出的,任何想法都会被测试,有些东西我会回顾link 1, link 2, link 2
PS:我在 for 循环中更新任务状态
正如我提到的,更新是在一个 FOR 循环中进行的,为此我在找到 AsyncResult
的地方创建了一个 Recursive functions
,在我的例子中是 django 视图。这将检索任务的状态。当 oer_number = TotalOER.
task.state = SUCCESS
)
从透视角度看什么是合乎逻辑的,更新 FOR
循环 恢复 recursive functions
嗯,一个人忘记的事情。所以它只恢复一个值