如何在芹菜中为每个用户生成队列?

How to generate Queues per user inside celery?

所以我正在尝试将 Web 请求中的阻塞内容作为后台任务移动并利用队列。我也是消息传递和 pub/sub 的新手。用户将数据推送到那里并进行处理,稍后会通知用户。我为此做了一个 celery 设置,发现它不能满足我的用例,即为每个用户设置私人队列来完成他们自己的任务。

我尝试指定缺少队列的创建和工作人员生成期间(发送队列名称以逗号分隔),并将它们列在队列设置中,如之前在互联网上对 "dynamic queue creation with celery" 的回答所述。它会创建队列,但当我在设置和命令行中指定的队列名称与指定名称不同时,它不会创建队列。解决方案是使用不满足用例的队列名称生成更多工作人员,因为将有数百万个数据处理请求。

我发现 python-rq 具有队列对象初始化及其名称,我认为这会创建新队列。如果是,转向 RQ 是否正确?

redis_conn = Redis()
q = Queue('some_queue', connection=redis_conn)

我想要的是每个用户在后台为自己的任务排队。我没有在 celery 中看到任何在线创建动态队列的解决方案(没有在命令行中指定队列名称或设置)。 python-rq 似乎有那个解决方案。我的权衡是从 RabbitMQ 和 celry 转移到 redis。

有没有办法在 celery 中真正做到每个用户队列?如果是,请列出步骤。还是这种设计模式不对? pubsub 会满足用例吗?

Celery worker 仅从设置中启用的 task_queues setting or given on command line with -Q option. However, this can be changed dynamically, either from command line or from code. Just be sure to have task_create_missing_queues 定义的队列中消费(这是默认设置),因此当您开始使用它们时会自动创建新队列。

因此,在为给定用户发送任务之前,您必须指示工作人员开始从用户队列中消费。这可以通过使用 add_consumer control command, or from the code using app.control.add_consumer() 方法从命令行实现。这些操作是幂等的,因此如果工作人员已经从队列中消费,则什么也不会发生。如果队列尚不存在,则会自动创建。