Kafka Producer 发布消息到单个分区
Kafka Producer publishing message to single partition
我是 Kafka 的新手,正在阅读可用的官方文档。
在我的本地系统上,我已经启动了一个 kafka 实例和 zookeeper。 Zookeper 和 kafka 服务器都在默认端口上 运行ning。
我创建了一个主题 "test",复制因子为 1,因为我只有一个 kafka 实例,运行ning。
我还创建了两个分区。
我有两个消费者在同一个消费者组中订阅了这个队列。
现在我已经在 windows 机器上使用命令提示符启动消费者。
当我从命令提示符启动生产者并将消息发布到主题时,一切正常。 Kafka 使用循环将消息推送到两个分区,并且每个消费者交替接收消息,因为他们每个人都在监听不同的分区。
但是当我使用 java kafka-client jar 创建生产者时,即使我对消息使用不同的密钥,生产者也会将所有消息推送到同一个分区,因为所有消息都是在同一个分区上接收的消费者。
分区也不是静态的,每次我 运行 我的制作人时它都会不断变化。
我已经尝试过相同的场景,生产者从命令提示符开始,配置与我使用 java 代码提供给 kafka-client 生产者的配置完全相同。命令提示生产者似乎工作正常,但代码生产者将所有消息推送到同一分区。
我尝试更改某些消息的密钥,希望代理将其发送到不同的分区,因为文档中提到代理使用消息的密钥路由消息。
public class KafkaProducerParallel {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism-
producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
Producer<String, Long> parallelProducer = new KafkaProducer<>
(properties);
for(long i=0;i<100;i++) {
ProducerRecord<String, Long> producerRecord;
if(i<50) {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Amoeba", i);
}else {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Bacteria", i);
}
RecordMetadata recordMetadata =
parallelProducer.send(producerRecord).get();
System.out.printf("Sent record : with key %s and value
%d to partition %s", producerRecord.key(), producerRecord.value(),
recordMetadata.partition());
System.out.println();
}
parallelProducer.close();
}
}
根据文档,kafka 代理通过使用密钥(生成密钥的哈希值)来决定将特定消息放入哪个分区。
我在一段时间后更改我的记录的密钥,但消息仍然每次都转到同一个分区。
示例控制台代码输出:
Sent record : with key Amoeba and value 0 to partition 1
Sent record : with key Amoeba and value 1 to partition 1
Sent record : with key Amoeba and value 2 to partition 1
Sent record : with key Amoeba and value 3 to partition 1
Sent record : with key Amoeba and value 4 to partition 1
Sent record : with key Amoeba and value 5 to partition 1
Sent record : with key Amoeba and value 6 to partition 1
Sent record : with key Amoeba and value 7 to partition 1
Sent record : with key Amoeba and value 8 to partition 1
Sent record : with key Amoeba and value 9 to partition 1
Sent record : with key Amoeba and value 10 to partition 1
Sent record : with key Amoeba and value 11 to partition 1
Sent record : with key Amoeba and value 12 to partition 1
Sent record : with key Amoeba and value 13 to partition 1
Sent record : with key Bacteria and value 87 to partition 1
Sent record : with key Bacteria and value 88 to partition 1
Sent record : with key Bacteria and value 89 to partition 1
Sent record : with key Bacteria and value 90 to partition 1
Sent record : with key Bacteria and value 91 to partition 1
Sent record : with key Bacteria and value 92 to partition 1
Sent record : with key Bacteria and value 93 to partition 1
Sent record : with key Bacteria and value 94 to partition 1
Sent record : with key Bacteria and value 95 to partition 1
Sent record : with key Bacteria and value 96 to partition 1
Sent record : with key Bacteria and value 97 to partition 1
Sent record : with key Bacteria and value 98 to partition 1
Sent record : with key Bacteria and value 99 to partition 1
一切正常。
在您的特定情况下,KafkaProducer
(确定分区)使用的分区程序为两个键计算相同的分区:Amoeba
、Bacteria
。默认情况下,KafkaProducer 使用 org.apache.kafka.clients.producer.internals.DefaultPartitioner
.
建议:更换key或者增加分区数
注意:Producer决定将消息放到哪个分区,而不是Broker。
从 Producer<String, String> producer = new KafkaProducer<String, String>
更改代码
至:
KafkaProducer<String, String> producer = new KafkaProducer<String, String>
默认情况下,接口实现将数据放入同一分区。所以使用 KafkaProducer
而不是简单的 Producer
从 2.4 版及更高版本的 Apache Kafka 开始,默认分区策略已更改为具有空键的记录,其中粘性分区是默认行为。
之前的循环策略意味着具有空键的记录将跨分区拆分,新的粘性分区策略将记录发送到同一分区,直到分区的批处理“完成”(这由 batch.size 或 linger.ms)
查看这篇文章了解更多信息:
Improvements with Sticky Partitioner
我是 Kafka 的新手,正在阅读可用的官方文档。
在我的本地系统上,我已经启动了一个 kafka 实例和 zookeeper。 Zookeper 和 kafka 服务器都在默认端口上 运行ning。
我创建了一个主题 "test",复制因子为 1,因为我只有一个 kafka 实例,运行ning。
我还创建了两个分区。
我有两个消费者在同一个消费者组中订阅了这个队列。
现在我已经在 windows 机器上使用命令提示符启动消费者。
当我从命令提示符启动生产者并将消息发布到主题时,一切正常。 Kafka 使用循环将消息推送到两个分区,并且每个消费者交替接收消息,因为他们每个人都在监听不同的分区。
但是当我使用 java kafka-client jar 创建生产者时,即使我对消息使用不同的密钥,生产者也会将所有消息推送到同一个分区,因为所有消息都是在同一个分区上接收的消费者。
分区也不是静态的,每次我 运行 我的制作人时它都会不断变化。
我已经尝试过相同的场景,生产者从命令提示符开始,配置与我使用 java 代码提供给 kafka-client 生产者的配置完全相同。命令提示生产者似乎工作正常,但代码生产者将所有消息推送到同一分区。
我尝试更改某些消息的密钥,希望代理将其发送到不同的分区,因为文档中提到代理使用消息的密钥路由消息。
public class KafkaProducerParallel {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism-
producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
Producer<String, Long> parallelProducer = new KafkaProducer<>
(properties);
for(long i=0;i<100;i++) {
ProducerRecord<String, Long> producerRecord;
if(i<50) {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Amoeba", i);
}else {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Bacteria", i);
}
RecordMetadata recordMetadata =
parallelProducer.send(producerRecord).get();
System.out.printf("Sent record : with key %s and value
%d to partition %s", producerRecord.key(), producerRecord.value(),
recordMetadata.partition());
System.out.println();
}
parallelProducer.close();
}
}
根据文档,kafka 代理通过使用密钥(生成密钥的哈希值)来决定将特定消息放入哪个分区。 我在一段时间后更改我的记录的密钥,但消息仍然每次都转到同一个分区。
示例控制台代码输出:
Sent record : with key Amoeba and value 0 to partition 1
Sent record : with key Amoeba and value 1 to partition 1
Sent record : with key Amoeba and value 2 to partition 1
Sent record : with key Amoeba and value 3 to partition 1
Sent record : with key Amoeba and value 4 to partition 1
Sent record : with key Amoeba and value 5 to partition 1
Sent record : with key Amoeba and value 6 to partition 1
Sent record : with key Amoeba and value 7 to partition 1
Sent record : with key Amoeba and value 8 to partition 1
Sent record : with key Amoeba and value 9 to partition 1
Sent record : with key Amoeba and value 10 to partition 1
Sent record : with key Amoeba and value 11 to partition 1
Sent record : with key Amoeba and value 12 to partition 1
Sent record : with key Amoeba and value 13 to partition 1
Sent record : with key Bacteria and value 87 to partition 1
Sent record : with key Bacteria and value 88 to partition 1
Sent record : with key Bacteria and value 89 to partition 1
Sent record : with key Bacteria and value 90 to partition 1
Sent record : with key Bacteria and value 91 to partition 1
Sent record : with key Bacteria and value 92 to partition 1
Sent record : with key Bacteria and value 93 to partition 1
Sent record : with key Bacteria and value 94 to partition 1
Sent record : with key Bacteria and value 95 to partition 1
Sent record : with key Bacteria and value 96 to partition 1
Sent record : with key Bacteria and value 97 to partition 1
Sent record : with key Bacteria and value 98 to partition 1
Sent record : with key Bacteria and value 99 to partition 1
一切正常。
在您的特定情况下,KafkaProducer
(确定分区)使用的分区程序为两个键计算相同的分区:Amoeba
、Bacteria
。默认情况下,KafkaProducer 使用 org.apache.kafka.clients.producer.internals.DefaultPartitioner
.
建议:更换key或者增加分区数
注意:Producer决定将消息放到哪个分区,而不是Broker。
从 Producer<String, String> producer = new KafkaProducer<String, String>
更改代码
至:
KafkaProducer<String, String> producer = new KafkaProducer<String, String>
默认情况下,接口实现将数据放入同一分区。所以使用 KafkaProducer
而不是简单的 Producer
从 2.4 版及更高版本的 Apache Kafka 开始,默认分区策略已更改为具有空键的记录,其中粘性分区是默认行为。
之前的循环策略意味着具有空键的记录将跨分区拆分,新的粘性分区策略将记录发送到同一分区,直到分区的批处理“完成”(这由 batch.size 或 linger.ms)
查看这篇文章了解更多信息: Improvements with Sticky Partitioner