使用 RabbitMQ 进行批处理的最佳实践

Best Practice for Batch Processing with RabbitMQ

我正在寻找使用 Python 预制 ETL 的最佳方法。

我在 RabbitMQ 中有一个发送事件的通道(甚至可以每秒发送一次)。 我想处理每 1000 个。 主要问题是 RabbitMQ 接口(我使用的是鼠兔)在每条消息上都会引发回调。 我查看了 Celery 框架,但是批处理功能在版本 3 中被贬低了。

最好的方法是什么?我考虑将我的事件保存在列表中,当它达到 1000 时将其复制到其他列表并执行我的处理。但是,如何使其成为线程安全的?不想丢事件,又怕同步列表的时候丢事件

这听起来像是一个非常简单的用例,但是我没有找到任何好的最佳实践。

How do I make it thread-safe?

如何设置消费者prefetch-count=1000。如果消费者的 unack 消息达到其预取限制,rabbitmq 将不会向其传递任何消息。

不要ACK收到消息,直到您有1000条消息,然后将其复制到其他列表并执行您的处理。完成工作后,ACK 最后一条消息,以及 all message before this message will be ACK by rabbitmq server

但我不确定大预取是否是最佳实践。

首先,你不应该 "batch" 来自 RabbitMQ 的消息,除非你真的必须这样做。处理消息传递的最有效方法是独立处理每条消息。

如果您需要批量合并消息,我会使用单独的数据存储来临时存储消息,然后在达到特定条件时处理它们。每次将项目添加到批次时,您都会检查该条件(例如,您达到了 1000 条消息)并触发批次的处理。

这比在内存中保留一个列表要好,因为如果您的服务终止,消息仍会保留在数据库中。

注意:如果每个队列只有一个处理器,这可以在没有任何同步机制的情况下工作。如果您有多个处理器,则需要实施某种锁定机制。