使用Celey+RabbitMQ+Redis实现订单管理服务

Implementation of order management service using Celey+RabbitMQ+Redis

我已经使用 zipstream 实现了将一些文件打包成 .zip 文件的功能,因此响应的格式是 mimetype='application/zip' 使用 烧瓶。现在我想实现提交订单(即生成.zip文件的任务)、跟踪订单状态(例如STARTEDSUCCESSFAILURE)、撤销订单的服务(即取消生成 .zip 文件或删除生成的 .zip 文件的任务),以及为当前用户下载订单(即 .zip 文件)。我计划使用 CeleryRabbitMQ 作为消息代理,Redis 作为结果后端。

问题来了。仅 Redis 作为结果后端就足够了吗?因为用户订单的跟踪状态似乎涉及 user = ... 之类的查询,而不是 task id

AsyncResult 查询的支持

更新: 感谢@Tomáš Linhart。我遵循使用芹菜 signal handler mechanismMongoDB 中存储结果的扩展方式。这是与我创建的任务相关的代码片段。我创建 task_trackercelery_cli 实例的方式与 @Tomáš Linhart 的 answer.

中的 trackerapp 相同
# Import task_tracker and celery_cli here
...

@task_tracker.track
@celery_cli.task(name='pack-up-tif')
def async_pack_up_tif(**kwargs):
    # Some processing here
    ...

def pack_up_tif(msg):
    result = async_pack_up_tif.delay(**msg)
    return result

但我还有一个问题。如何在调用 delay 方法时拦截任务 ID?因为当 task_success 信号被触发时,我需要通过 find_one_and_update 将该信息存储在 MongoDB 的集合中。

详细说明我发布的评论并回答您更新的问题。这有点涉及确定从哪里收集您想要在每个信号中跟踪的各种数据。一些信号为您提供任务本身,一些信号为您提供任务请求。

我以 task_success 信号的这个处理程序结束:

def _on_task_success(self, sender, result, **other_kwargs):
    if sender.name not in self.tasks:
        return

    collection = self.mongo \
        .get_database(self.config['mongodb']['database']) \
        .get_collection(self.config['mongodb']['collection'])
    collection.find_one_and_update(
        {'_id': sender.request.id},
        {
            '$setOnInsert': {
                'name': sender.name,
                'args': sender.request.args,
                'kwargs': sender.request.kwargs
            },
            '$set': {
                'status': states.SUCCESS,
                'date_done': datetime.datetime.utcnow(),
                'retries': sender.request.retries,
                'group_id': sender.request.group,
                'chord_id': sender.request.chord,
                'root_id': sender.request.root_id,
                'parent_id': sender.request.parent_id,
                'result': result
            },
            '$push': {
                'status_history': {
                    'date': datetime.datetime.utcnow(),
                    'status': states.SUCCESS
                }
            }
        },
        upsert=True,
        return_document=ReturnDocument.AFTER)