使用 Kafka 处理大消息

Handling Large Messages with Kafka

如何在 Kafka 中处理超过 20MB 等大消息

[2019-03-13 08:59:10,923] 向主题测试发送消息时出错,键:13 字节,值:11947696 字节,错误:(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.RecordTooLargeException: 请求包含的消息大于服务器将接受的最大消息大小。

[2019-03-13 03:59:14,478] 向主题测试发送消息时出错,键:13 字节,值:11947696 字节,错误:(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.RecordTooLargeException:序列化后的消息为 11947797 字节,大于您使用 max.request.size 配置配置的最大请求大小。

我们需要设置以下配置

经纪人

replica.fetch.max.bytes:对此 属性 的更改将允许代理中的副本在集群内发送消息并确保消息被复制正确。如果太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制)。

message.max.bytes:这是代理可以从生产者接收的最大消息大小。

经纪人(话题)

max.message.bytes:Kafka允许的最大记录batch size。如果此值增加并且存在早于 0.10.2 的消费者,则消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组到批次中。在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于这种情况下的单个记录(默认为代理的 message.max.bytes)。

制作人

max.request.size: 请求的最大字节数。此设置将限制生产者在单个请求中发送的记录批次数量,以避免发送大量请求。这也是最大记录批大小的有效上限。请注意,服务器有自己的记录批量大小上限,可能与此不同。

compression.type:设置为snappy,这会增加单个请求可以发送的数据总量,应该搭配更大的batch.size.

buffer.memory: 如果启用压缩,缓冲区大小也应该增加。

batch.size: 批量大小应该至少为 10 KB,减少 return 可以看到大约 300 kb(远程客户端更少) .批次越大,压缩率也越高。

linger.ms: linger.ms 抢占了批处理大小的任何界限。增加此值以确保在较慢的生产时间期间不会发送较小的批次

消费者

fetch.message.max.bytes: 这将决定消费者可以获取的消息的最大大小。

max.partition.fetch.bytes: 服务器每个分区的最大数据量 return.