使用Celey+RabbitMQ+Redis实现订单管理服务
Implementation of order management service using Celey+RabbitMQ+Redis
我已经使用 zipstream
实现了将一些文件打包成 .zip
文件的功能,因此响应的格式是 mimetype='application/zip'
使用 烧瓶。现在我想实现提交订单(即生成.zip
文件的任务)、跟踪订单状态(例如STARTED
、SUCCESS
、FAILURE
)、撤销订单的服务(即取消生成 .zip
文件或删除生成的 .zip
文件的任务),以及为当前用户下载订单(即 .zip
文件)。我计划使用 Celery,RabbitMQ 作为消息代理,Redis 作为结果后端。
问题来了。仅 Redis 作为结果后端就足够了吗?因为用户订单的跟踪状态似乎涉及 user = ...
之类的查询,而不是 task id
对 AsyncResult
查询的支持
更新:
感谢@Tomáš Linhart。我遵循使用芹菜 signal handler mechanism
在 MongoDB 中存储结果的扩展方式。这是与我创建的任务相关的代码片段。我创建 task_tracker
和 celery_cli
实例的方式与 @Tomáš Linhart 的 answer.
中的 tracker
和 app
相同
# Import task_tracker and celery_cli here
...
@task_tracker.track
@celery_cli.task(name='pack-up-tif')
def async_pack_up_tif(**kwargs):
# Some processing here
...
def pack_up_tif(msg):
result = async_pack_up_tif.delay(**msg)
return result
但我还有一个问题。如何在调用 delay
方法时拦截任务 ID?因为当 task_success
信号被触发时,我需要通过 find_one_and_update
将该信息存储在 MongoDB 的集合中。
详细说明我发布的评论并回答您更新的问题。这有点涉及确定从哪里收集您想要在每个信号中跟踪的各种数据。一些信号为您提供任务本身,一些信号为您提供任务请求。
我以 task_success
信号的这个处理程序结束:
def _on_task_success(self, sender, result, **other_kwargs):
if sender.name not in self.tasks:
return
collection = self.mongo \
.get_database(self.config['mongodb']['database']) \
.get_collection(self.config['mongodb']['collection'])
collection.find_one_and_update(
{'_id': sender.request.id},
{
'$setOnInsert': {
'name': sender.name,
'args': sender.request.args,
'kwargs': sender.request.kwargs
},
'$set': {
'status': states.SUCCESS,
'date_done': datetime.datetime.utcnow(),
'retries': sender.request.retries,
'group_id': sender.request.group,
'chord_id': sender.request.chord,
'root_id': sender.request.root_id,
'parent_id': sender.request.parent_id,
'result': result
},
'$push': {
'status_history': {
'date': datetime.datetime.utcnow(),
'status': states.SUCCESS
}
}
},
upsert=True,
return_document=ReturnDocument.AFTER)
我已经使用 zipstream
实现了将一些文件打包成 .zip
文件的功能,因此响应的格式是 mimetype='application/zip'
使用 烧瓶。现在我想实现提交订单(即生成.zip
文件的任务)、跟踪订单状态(例如STARTED
、SUCCESS
、FAILURE
)、撤销订单的服务(即取消生成 .zip
文件或删除生成的 .zip
文件的任务),以及为当前用户下载订单(即 .zip
文件)。我计划使用 Celery,RabbitMQ 作为消息代理,Redis 作为结果后端。
问题来了。仅 Redis 作为结果后端就足够了吗?因为用户订单的跟踪状态似乎涉及 user = ...
之类的查询,而不是 task id
AsyncResult
查询的支持
更新:
感谢@Tomáš Linhart。我遵循使用芹菜 signal handler mechanism
在 MongoDB 中存储结果的扩展方式。这是与我创建的任务相关的代码片段。我创建 task_tracker
和 celery_cli
实例的方式与 @Tomáš Linhart 的 answer.
tracker
和 app
相同
# Import task_tracker and celery_cli here
...
@task_tracker.track
@celery_cli.task(name='pack-up-tif')
def async_pack_up_tif(**kwargs):
# Some processing here
...
def pack_up_tif(msg):
result = async_pack_up_tif.delay(**msg)
return result
但我还有一个问题。如何在调用 delay
方法时拦截任务 ID?因为当 task_success
信号被触发时,我需要通过 find_one_and_update
将该信息存储在 MongoDB 的集合中。
详细说明我发布的评论并回答您更新的问题。这有点涉及确定从哪里收集您想要在每个信号中跟踪的各种数据。一些信号为您提供任务本身,一些信号为您提供任务请求。
我以 task_success
信号的这个处理程序结束:
def _on_task_success(self, sender, result, **other_kwargs):
if sender.name not in self.tasks:
return
collection = self.mongo \
.get_database(self.config['mongodb']['database']) \
.get_collection(self.config['mongodb']['collection'])
collection.find_one_and_update(
{'_id': sender.request.id},
{
'$setOnInsert': {
'name': sender.name,
'args': sender.request.args,
'kwargs': sender.request.kwargs
},
'$set': {
'status': states.SUCCESS,
'date_done': datetime.datetime.utcnow(),
'retries': sender.request.retries,
'group_id': sender.request.group,
'chord_id': sender.request.chord,
'root_id': sender.request.root_id,
'parent_id': sender.request.parent_id,
'result': result
},
'$push': {
'status_history': {
'date': datetime.datetime.utcnow(),
'status': states.SUCCESS
}
}
},
upsert=True,
return_document=ReturnDocument.AFTER)