如何在 Tornado 中使用 Celery 和 Redis 作为代理和后端?
How to use Celery in Tornado with redis as broker and backend?
当我尝试在Tornado4.2 中使用celery3.1.11、tornado-celery0.3.5 并产生异常时引发异常。它在没有 yield 的情况下工作,但无法获得异步结果...我也发现当我使用 rabbitmq 作为代理时它工作,而 redis 会引发以下错误...
这是我的代码。
from mycelery import celery_task
import tcelery
tcelery.setup_nonblocking_producer()
token = yield tornado.gen.Task(celery_task.get_rongcloud_token.apply_async,args=[3])
print token
我的 Celery 任务:
from celery import Celery, platforms
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery.exceptions import SoftTimeLimitExceeded
platforms.C_FORCE_ROOT = True # linux 下要root用户才不报错
broker = 'redis://:'+settings.REDIS_PASS+'@127.0.0.1:6379/5'
backend = 'redis://:'+settings.REDIS_PASS+'@127.0.0.1:6379/6'
app = Celery('tasks', broker=broker, backend=backend)
@app.task(name='mycelery.celery_task.get_rongcloud_token')
def get_rongcloud_token(user_id):
print 'xxxxx'
a = 'xxx'
return a
这是错误:
TypeError: <function wrapper at 0x5bd2c80> is not JSON serializable
呵呵,求同题:
关于 tcelery
的有限文档没有太多解释,您的示例也没有太多说明。从有限的文档来看,您似乎确实在按预期使用它(除非我遗漏了一些东西),所以我不确定为什么您的代码不起作用。我通过以下方法取得了成功:
class CeleryTasks(web.RequestHandler):
@gen.coroutine
def get(self):
future = concurrent.Future()
celery_task = tasks.get_rongcloud_token.delay(3)
check_status(celery_task, future)
yield future
self.write(future.result())
#self.write(celery_task.result)
def check_status(celery_task, future):
"""
Check the status of the celery task and set the result in the future
"""
if not celery_task.ready():
ioloop.IOLoop.current().call_later(
1,
check_status,
celery_task,
future)
else:
future.set_result(celery_task.result)
首先创建一个裸 Future
,它将产生直到 celery 任务的结果可用。接下来像通常异步执行一样执行 celery 任务(例如 task_fn.delay(*args)
或 task_fn.apply_async(*args)
)。将 celery_task
和 future
传递给函数 (check_status
),该函数将检查任务是否为 "ready",如果不是,则稍后递归调用自身并再次检查.然后yield
Future
直到设置结果。任务完成并且结果可用后,将 Future
设置为结果,然后对结果执行任何需要的操作。
基于notorious.no的回答:
def check_status(celery_task, future):
"""
celery状态检查辅助函数...
Check the status of the celery task and set the result in the future
"""
import tornado.ioloop
if not celery_task.ready():
tornado.ioloop.IOLoop.current().call_later(
0.1, # 100ms
check_status,
celery_task,
future)
else:
future.set_result(celery_task.result)
@tornado.gen.coroutine
def celery_get_rongcloud_token(user_id, name='', portraitUri=''):
from mycelery import celery_task
import tornado.concurrent
future = tornado.concurrent.Future()
get_token_task = celery_task.get_rongcloud_token.delay(user_id, name, portraitUri)
check_status(get_token_task, future)
yield future
token = future.result()
raise tornado.gen.Return(token)
然后在您的代码中:
token = yield celery_get_rongcloud_token(user_id)
当我尝试在Tornado4.2 中使用celery3.1.11、tornado-celery0.3.5 并产生异常时引发异常。它在没有 yield 的情况下工作,但无法获得异步结果...我也发现当我使用 rabbitmq 作为代理时它工作,而 redis 会引发以下错误...
这是我的代码。
from mycelery import celery_task
import tcelery
tcelery.setup_nonblocking_producer()
token = yield tornado.gen.Task(celery_task.get_rongcloud_token.apply_async,args=[3])
print token
我的 Celery 任务:
from celery import Celery, platforms
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery.exceptions import SoftTimeLimitExceeded
platforms.C_FORCE_ROOT = True # linux 下要root用户才不报错
broker = 'redis://:'+settings.REDIS_PASS+'@127.0.0.1:6379/5'
backend = 'redis://:'+settings.REDIS_PASS+'@127.0.0.1:6379/6'
app = Celery('tasks', broker=broker, backend=backend)
@app.task(name='mycelery.celery_task.get_rongcloud_token')
def get_rongcloud_token(user_id):
print 'xxxxx'
a = 'xxx'
return a
这是错误:
TypeError: <function wrapper at 0x5bd2c80> is not JSON serializable
呵呵,求同题:
关于 tcelery
的有限文档没有太多解释,您的示例也没有太多说明。从有限的文档来看,您似乎确实在按预期使用它(除非我遗漏了一些东西),所以我不确定为什么您的代码不起作用。我通过以下方法取得了成功:
class CeleryTasks(web.RequestHandler):
@gen.coroutine
def get(self):
future = concurrent.Future()
celery_task = tasks.get_rongcloud_token.delay(3)
check_status(celery_task, future)
yield future
self.write(future.result())
#self.write(celery_task.result)
def check_status(celery_task, future):
"""
Check the status of the celery task and set the result in the future
"""
if not celery_task.ready():
ioloop.IOLoop.current().call_later(
1,
check_status,
celery_task,
future)
else:
future.set_result(celery_task.result)
首先创建一个裸 Future
,它将产生直到 celery 任务的结果可用。接下来像通常异步执行一样执行 celery 任务(例如 task_fn.delay(*args)
或 task_fn.apply_async(*args)
)。将 celery_task
和 future
传递给函数 (check_status
),该函数将检查任务是否为 "ready",如果不是,则稍后递归调用自身并再次检查.然后yield
Future
直到设置结果。任务完成并且结果可用后,将 Future
设置为结果,然后对结果执行任何需要的操作。
基于notorious.no的回答:
def check_status(celery_task, future):
"""
celery状态检查辅助函数...
Check the status of the celery task and set the result in the future
"""
import tornado.ioloop
if not celery_task.ready():
tornado.ioloop.IOLoop.current().call_later(
0.1, # 100ms
check_status,
celery_task,
future)
else:
future.set_result(celery_task.result)
@tornado.gen.coroutine
def celery_get_rongcloud_token(user_id, name='', portraitUri=''):
from mycelery import celery_task
import tornado.concurrent
future = tornado.concurrent.Future()
get_token_task = celery_task.get_rongcloud_token.delay(user_id, name, portraitUri)
check_status(get_token_task, future)
yield future
token = future.result()
raise tornado.gen.Return(token)
然后在您的代码中:
token = yield celery_get_rongcloud_token(user_id)