Kafka 分区中的消息分布不均匀

Uneven Distribution of messages in Kafka Partitions

我有一个主题,有 10 个分区,1 个消费者组和 4 个消费者,工人人数为 3。

我可以看到分区中的消息分布不均匀,一个分区有这么多数据而另一个分区是空闲的。

如何让我的生产者将负载平均分配到所有分区,以便所有分区都得到正确利用?

看来您的问题是消息消费不均衡,而不是向 Kafka 主题生成消息不均衡。换句话说,您的读取线程数量与您拥有的分区数量不匹配(尽管它们不需要匹配 1:1,每个消费者线程读取的分区数量相同)。

See 简短说明以获取更多详细信息。

根据 DefaultPartitioner class 本身的 JavaDoc 注释,默认分区策略是:

  • 如果记录中指定了分区,则使用它。
  • 如果未指定分区但存在键,请根据键的哈希选择分区。
  • 如果不存在分区或键,则以循环方式选择一个分区。

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

所以这里有两个可能导致分布不均的原因,具体取决于您是否在生成消息时指定了密钥:

可以利用producer record的key参数。这是一件事,对于一个特定的键,数据现在总是进入同一个分区,我不知道你的生产者记录的结构,但正如你所说你有 10 个分区,那么你可以简单地使用 n%10 作为你的生产者记录键。 现在 n 是 0 到 9,你的记录 0 键将为 0,然后 kafka 将生成一个哈希键并将其放在某个分区中,比如分区 0,对于记录 1,它将是一个,然后它将进入第一个分区等等。 通过这种方式,您将能够在生产者记录上应用循环法,您的密钥将独立于记录中的字段,因此您可以拥有变量 n 和密钥作为 n%10。

或者您可以在生产者记录中指定分区。因此,您要么使用生产者记录的键,要么使用分区字段。

您可以为生产者分配一个分区号,而不是使用默认分区程序 class,这样消息就可以直接发送到指定的分区,

 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, partitionNumber,key, value);

如果您从记录中定义了分区器,假设在 Kafka 中键是字符串,值是学生 Pojo。

在学生 Pojo 中,假设基于学生国家/地区字段,我想进入特定分区。假设一个主题中有 10 个分区,例如,在值中,"India" 是一个国家,根据 "India" 我们得到分区号 5。

每当国家/地区为"India"时,Kafka 将分配第 5 个分区,并且该记录始终转到第 5 个分区(如果分区未更改)。

假设在你的管道中有很多记录即将到来并且有一个国家"India",所有这些记录都将进入分区号 5,你会看到 Kafka 分区中的分布不均匀。

在我的例子中,我使用了默认的分区程序,但一个分区中的记录仍然比其他分区中的记录多得多。问题是我出乎意料地有许多使用相同密钥的记录。检查您的钥匙!

因为我无法用 Faust 解决这个问题,所以我使用的方法是自己实现 'round-robin' 分布。

我遍历我的记录以生成并执行例如:

for index, message in enumerate(messages):
    topic.send(message, partition=index % num_partitions)

即将我的索引绑定到我拥有的分区范围内。

可能仍然存在不平衡 - 反复考虑你 运行 但是你的记录数量少于你的 num_partitions - 那么你的第一个分区将继续获得主要的消息份额。您可以通过添加随机偏移量来避免此问题:

import random
initial_partition = random.randrange(0, num_partitions)
for index, message in enumerate(messages):
    topic.send(message, partition=(initial_partition + index) % num_partitions)