Kafka如何通过Spark均匀分布数据产生消息?

How to distribute data evenly in Kafka producing messages through Spark?

我有一个将数据写入 Kafka 的流作业,我注意到其中一个 Kafka 分区 (#3) 比其他分区占用更多数据。

+-----------------------------------------------------+
| partition | messages  | earlist offset | next offset|
+-----------------------------------------------------+
|1          | 166522754 | 5861603324     | 6028126078 |
|2          | 152251127 | 6010226633     | 6162477760 |
|3          | 382935293 | 6332944925     | 6715880218 |
|4          | 188126274 | 6171311709     | 6359437983 |
|5          | 188270700 | 6100140089     | 6288410789 |
+-----------------------------------------------------+

我找到了一个选项——使用 Kafka 分区数 (5) 重新分区输出数据集。

有没有其他方法可以均匀分布数据?

数据在 Kafka 中的分区方式与数据在 Spark 及其数据集中的分区方式无关。从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义分区程序 class。

Kafka数据分区有以下几种场景:

消息键为空且没有自定义分区程序

如果 Kafka 消息中没有定义键,Kafka 将以循环方式在所有分区中分发消息。

消息键不为空且没有自定义分区程序

如果你提供消息键,默认情况下,Kafka会根据

决定分区
hash(key) % numer_of_partitions

提供自定义分区程序

如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的分区程序 class 并将其设置为生产者配置中的 partitioner.class

这是客户分区器 class 可能喜欢的示例

public class MyPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {}
  public void close() {}

  public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String)))
      throw new InvalidRecordException("Record did not have a string Key");

    if (((String) key).equals("myKey"))
       return 0; // This key will always go to Partition 0

    // Other records will go to the rest of the Partitions using a hashing function
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
  }
}