Python Celery 获取任务状态
Python Celery get task status
拥有此代码并使用 RabbitMQ 设置 Celery
任务已创建并执行。我得到任务 uuid,但不知何故无法检查任务状态
from flask_oidc import OpenIDConnect
from flask import Flask, json, g, request
from flask_cors import CORS
from celery import Celery
import os
import time
app = Flask(__name__)
app.config.update({
'OIDC_CLIENT_SECRETS': './client_secrets.json',
'OIDC_RESOURCE_SERVER_ONLY': True,
'CELERY_BROKER_URL': 'amqp://rabbitmq:rabbitmq@rabbit:5672/',
'CELERY_RESULT_BACKEND': 'rpc://'
})
oidc = OpenIDConnect(app)
CORS(app)
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)
@client.task()
def removeUser(id):
time.sleep(10);
@app.route("/removeUser", methods=['GET'])
def removeUserID():
id = request.args.get('id')
result = removeUser.apply_async(args=[id], countdown=60)
return result.id
@app.route("/checkStatus", methods=['GET'])
def checkStatus():
uuid = request.args.get('uuid')
res = removeUser.AsyncResult(uuid)
print(uuid)
if(res=="SUCCESS"):
return "success"
else:
return "progress"
def json_response(payload, status=200):
return (json.dumps(payload), status, {'content-type': 'application/json'})
res = removeUser.AsyncResult(uuid) 似乎没有获取任务状态。如何正确获取?
AsyncResult
是一个对象,state property是你想要的:
@app.route("/checkStatus", methods=['GET'])
def checkStatus():
uuid = request.args.get('uuid')
res = removeUser.AsyncResult(uuid)
print(uuid)
if res.state == "SUCCESS":
return "success"
else:
return "progress"
备注:
- 任务也可能会失败,因此有更好的方法来检查任务是否已执行(而不是与“成功”进行比较)。
- 如果您要添加 celery 应用程序,则不必使用特定任务,此功能可以帮助您通过 uuid 获取任何 celery 任务的状态。
@app.route("/checkStatus", methods=['GET'])
def checkStatus():
uuid = request.args.get('uuid')
res = AsyncResult(uuid, app=client)
print(uuid)
if res.ready():
return "success"
else:
return "progress"
拥有此代码并使用 RabbitMQ 设置 Celery
任务已创建并执行。我得到任务 uuid,但不知何故无法检查任务状态
from flask_oidc import OpenIDConnect
from flask import Flask, json, g, request
from flask_cors import CORS
from celery import Celery
import os
import time
app = Flask(__name__)
app.config.update({
'OIDC_CLIENT_SECRETS': './client_secrets.json',
'OIDC_RESOURCE_SERVER_ONLY': True,
'CELERY_BROKER_URL': 'amqp://rabbitmq:rabbitmq@rabbit:5672/',
'CELERY_RESULT_BACKEND': 'rpc://'
})
oidc = OpenIDConnect(app)
CORS(app)
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)
@client.task()
def removeUser(id):
time.sleep(10);
@app.route("/removeUser", methods=['GET'])
def removeUserID():
id = request.args.get('id')
result = removeUser.apply_async(args=[id], countdown=60)
return result.id
@app.route("/checkStatus", methods=['GET'])
def checkStatus():
uuid = request.args.get('uuid')
res = removeUser.AsyncResult(uuid)
print(uuid)
if(res=="SUCCESS"):
return "success"
else:
return "progress"
def json_response(payload, status=200):
return (json.dumps(payload), status, {'content-type': 'application/json'})
res = removeUser.AsyncResult(uuid) 似乎没有获取任务状态。如何正确获取?
AsyncResult
是一个对象,state property是你想要的:
@app.route("/checkStatus", methods=['GET'])
def checkStatus():
uuid = request.args.get('uuid')
res = removeUser.AsyncResult(uuid)
print(uuid)
if res.state == "SUCCESS":
return "success"
else:
return "progress"
备注:
- 任务也可能会失败,因此有更好的方法来检查任务是否已执行(而不是与“成功”进行比较)。
- 如果您要添加 celery 应用程序,则不必使用特定任务,此功能可以帮助您通过 uuid 获取任何 celery 任务的状态。
@app.route("/checkStatus", methods=['GET'])
def checkStatus():
uuid = request.args.get('uuid')
res = AsyncResult(uuid, app=client)
print(uuid)
if res.ready():
return "success"
else:
return "progress"