Kafka 分区中的消息分布不均匀
Uneven Distribution of messages in Kafka Partitions
我有一个主题,有 10 个分区,1 个消费者组和 4 个消费者,工人人数为 3。
我可以看到分区中的消息分布不均匀,一个分区有这么多数据而另一个分区是空闲的。
如何让我的生产者将负载平均分配到所有分区,以便所有分区都得到正确利用?
看来您的问题是消息消费不均衡,而不是向 Kafka 主题生成消息不均衡。换句话说,您的读取线程数量与您拥有的分区数量不匹配(尽管它们不需要匹配 1:1,每个消费者线程读取的分区数量相同)。
See 简短说明以获取更多详细信息。
根据 DefaultPartitioner class 本身的 JavaDoc 注释,默认分区策略是:
- 如果记录中指定了分区,则使用它。
- 如果未指定分区但存在键,请根据键的哈希选择分区。
- 如果不存在分区或键,则以循环方式选择一个分区。
所以这里有两个可能导致分布不均的原因,具体取决于您是否在生成消息时指定了密钥:
如果您指定了一个密钥,并且使用 DefaultPartitioner 时分布不均,最明显的解释是您多次指定了同一个密钥。
如果您未指定密钥并使用 DefaultPartitioner,则可能会发生不明显的行为。根据上面的内容,您会期望消息的循环分发,但事实并非如此。 0.8.0 中引入的优化可能会导致使用相同的分区。查看此 link 以获得更详细的解释:https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified? .
可以利用producer record的key参数。这是一件事,对于一个特定的键,数据现在总是进入同一个分区,我不知道你的生产者记录的结构,但正如你所说你有 10 个分区,那么你可以简单地使用 n%10 作为你的生产者记录键。
现在 n 是 0 到 9,你的记录 0 键将为 0,然后 kafka 将生成一个哈希键并将其放在某个分区中,比如分区 0,对于记录 1,它将是一个,然后它将进入第一个分区等等。
通过这种方式,您将能够在生产者记录上应用循环法,您的密钥将独立于记录中的字段,因此您可以拥有变量 n 和密钥作为 n%10。
或者您可以在生产者记录中指定分区。因此,您要么使用生产者记录的键,要么使用分区字段。
您可以为生产者分配一个分区号,而不是使用默认分区程序 class,这样消息就可以直接发送到指定的分区,
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, partitionNumber,key, value);
如果您从记录中定义了分区器,假设在 Kafka 中键是字符串,值是学生 Pojo。
在学生 Pojo 中,假设基于学生国家/地区字段,我想进入特定分区。假设一个主题中有 10 个分区,例如,在值中,"India" 是一个国家,根据 "India" 我们得到分区号 5。
每当国家/地区为"India"时,Kafka 将分配第 5 个分区,并且该记录始终转到第 5 个分区(如果分区未更改)。
假设在你的管道中有很多记录即将到来并且有一个国家"India",所有这些记录都将进入分区号 5,你会看到 Kafka 分区中的分布不均匀。
在我的例子中,我使用了默认的分区程序,但一个分区中的记录仍然比其他分区中的记录多得多。问题是我出乎意料地有许多使用相同密钥的记录。检查您的钥匙!
因为我无法用 Faust 解决这个问题,所以我使用的方法是自己实现 'round-robin' 分布。
我遍历我的记录以生成并执行例如:
for index, message in enumerate(messages):
topic.send(message, partition=index % num_partitions)
即将我的索引绑定到我拥有的分区范围内。
可能仍然存在不平衡 - 反复考虑你 运行 但是你的记录数量少于你的 num_partitions
- 那么你的第一个分区将继续获得主要的消息份额。您可以通过添加随机偏移量来避免此问题:
import random
initial_partition = random.randrange(0, num_partitions)
for index, message in enumerate(messages):
topic.send(message, partition=(initial_partition + index) % num_partitions)
我有一个主题,有 10 个分区,1 个消费者组和 4 个消费者,工人人数为 3。
我可以看到分区中的消息分布不均匀,一个分区有这么多数据而另一个分区是空闲的。
如何让我的生产者将负载平均分配到所有分区,以便所有分区都得到正确利用?
看来您的问题是消息消费不均衡,而不是向 Kafka 主题生成消息不均衡。换句话说,您的读取线程数量与您拥有的分区数量不匹配(尽管它们不需要匹配 1:1,每个消费者线程读取的分区数量相同)。
See 简短说明以获取更多详细信息。
根据 DefaultPartitioner class 本身的 JavaDoc 注释,默认分区策略是:
- 如果记录中指定了分区,则使用它。
- 如果未指定分区但存在键,请根据键的哈希选择分区。
- 如果不存在分区或键,则以循环方式选择一个分区。
所以这里有两个可能导致分布不均的原因,具体取决于您是否在生成消息时指定了密钥:
如果您指定了一个密钥,并且使用 DefaultPartitioner 时分布不均,最明显的解释是您多次指定了同一个密钥。
如果您未指定密钥并使用 DefaultPartitioner,则可能会发生不明显的行为。根据上面的内容,您会期望消息的循环分发,但事实并非如此。 0.8.0 中引入的优化可能会导致使用相同的分区。查看此 link 以获得更详细的解释:https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified? .
可以利用producer record的key参数。这是一件事,对于一个特定的键,数据现在总是进入同一个分区,我不知道你的生产者记录的结构,但正如你所说你有 10 个分区,那么你可以简单地使用 n%10 作为你的生产者记录键。 现在 n 是 0 到 9,你的记录 0 键将为 0,然后 kafka 将生成一个哈希键并将其放在某个分区中,比如分区 0,对于记录 1,它将是一个,然后它将进入第一个分区等等。 通过这种方式,您将能够在生产者记录上应用循环法,您的密钥将独立于记录中的字段,因此您可以拥有变量 n 和密钥作为 n%10。
或者您可以在生产者记录中指定分区。因此,您要么使用生产者记录的键,要么使用分区字段。
您可以为生产者分配一个分区号,而不是使用默认分区程序 class,这样消息就可以直接发送到指定的分区,
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, partitionNumber,key, value);
如果您从记录中定义了分区器,假设在 Kafka 中键是字符串,值是学生 Pojo。
在学生 Pojo 中,假设基于学生国家/地区字段,我想进入特定分区。假设一个主题中有 10 个分区,例如,在值中,"India" 是一个国家,根据 "India" 我们得到分区号 5。
每当国家/地区为"India"时,Kafka 将分配第 5 个分区,并且该记录始终转到第 5 个分区(如果分区未更改)。
假设在你的管道中有很多记录即将到来并且有一个国家"India",所有这些记录都将进入分区号 5,你会看到 Kafka 分区中的分布不均匀。
在我的例子中,我使用了默认的分区程序,但一个分区中的记录仍然比其他分区中的记录多得多。问题是我出乎意料地有许多使用相同密钥的记录。检查您的钥匙!
因为我无法用 Faust 解决这个问题,所以我使用的方法是自己实现 'round-robin' 分布。
我遍历我的记录以生成并执行例如:
for index, message in enumerate(messages):
topic.send(message, partition=index % num_partitions)
即将我的索引绑定到我拥有的分区范围内。
可能仍然存在不平衡 - 反复考虑你 运行 但是你的记录数量少于你的 num_partitions
- 那么你的第一个分区将继续获得主要的消息份额。您可以通过添加随机偏移量来避免此问题:
import random
initial_partition = random.randrange(0, num_partitions)
for index, message in enumerate(messages):
topic.send(message, partition=(initial_partition + index) % num_partitions)