Kafka如何通过Spark均匀分布数据产生消息?
How to distribute data evenly in Kafka producing messages through Spark?
我有一个将数据写入 Kafka 的流作业,我注意到其中一个 Kafka 分区 (#3) 比其他分区占用更多数据。
+-----------------------------------------------------+
| partition | messages | earlist offset | next offset|
+-----------------------------------------------------+
|1 | 166522754 | 5861603324 | 6028126078 |
|2 | 152251127 | 6010226633 | 6162477760 |
|3 | 382935293 | 6332944925 | 6715880218 |
|4 | 188126274 | 6171311709 | 6359437983 |
|5 | 188270700 | 6100140089 | 6288410789 |
+-----------------------------------------------------+
我找到了一个选项——使用 Kafka 分区数 (5) 重新分区输出数据集。
有没有其他方法可以均匀分布数据?
数据在 Kafka 中的分区方式与数据在 Spark 及其数据集中的分区方式无关。从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义分区程序 class。
Kafka数据分区有以下几种场景:
消息键为空且没有自定义分区程序
如果 Kafka 消息中没有定义键,Kafka 将以循环方式在所有分区中分发消息。
消息键不为空且没有自定义分区程序
如果你提供消息键,默认情况下,Kafka会根据
决定分区
hash(key) % numer_of_partitions
提供自定义分区程序
如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的分区程序 class 并将其设置为生产者配置中的 partitioner.class
。
这是客户分区器 class 可能喜欢的示例
public class MyPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public void close() {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("Record did not have a string Key");
if (((String) key).equals("myKey"))
return 0; // This key will always go to Partition 0
// Other records will go to the rest of the Partitions using a hashing function
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
}
}
我有一个将数据写入 Kafka 的流作业,我注意到其中一个 Kafka 分区 (#3) 比其他分区占用更多数据。
+-----------------------------------------------------+
| partition | messages | earlist offset | next offset|
+-----------------------------------------------------+
|1 | 166522754 | 5861603324 | 6028126078 |
|2 | 152251127 | 6010226633 | 6162477760 |
|3 | 382935293 | 6332944925 | 6715880218 |
|4 | 188126274 | 6171311709 | 6359437983 |
|5 | 188270700 | 6100140089 | 6288410789 |
+-----------------------------------------------------+
我找到了一个选项——使用 Kafka 分区数 (5) 重新分区输出数据集。
有没有其他方法可以均匀分布数据?
数据在 Kafka 中的分区方式与数据在 Spark 及其数据集中的分区方式无关。从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义分区程序 class。
Kafka数据分区有以下几种场景:
消息键为空且没有自定义分区程序
如果 Kafka 消息中没有定义键,Kafka 将以循环方式在所有分区中分发消息。
消息键不为空且没有自定义分区程序
如果你提供消息键,默认情况下,Kafka会根据
决定分区hash(key) % numer_of_partitions
提供自定义分区程序
如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的分区程序 class 并将其设置为生产者配置中的 partitioner.class
。
这是客户分区器 class 可能喜欢的示例
public class MyPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public void close() {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("Record did not have a string Key");
if (((String) key).equals("myKey"))
return 0; // This key will always go to Partition 0
// Other records will go to the rest of the Partitions using a hashing function
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
}
}