kafka 流即使在 exactly_once 启用后也会获取重复记录
kafka stream getting duplicate records even after exactly_once enabled
我正在使用 kafka 流接收一些数据,我注意到它收到的记录比我发送的多,下面是我在消费者处的设置
消费者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-user-process");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettigs.getKafkaBroker());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaSettigs.getTotalStreamTHreadCounnt());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "600");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
制作方道具
Propertiesprops=newProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"mybootstarpservers");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"clientnoveluser");
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,1500))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,(newGenericSerializer<MyPojo>()).getClass().getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyRandom.class);
下面是我的生产者代码
public void producerSendData(String key, MyPojo message) throws Exception {
final Producer<String, MyPojo s> producer = myProducerInstance.createProducer();
final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", key,message);
try {
producer.send(record, new ProducerCallback());
producer.flush();
}
finally {
}
}
我的主题总共有 10 个分区,我的生产者使用 Round Robin 类型的分区逻辑并平等地写入所有分区,为了在生产者端进行测试,10 个不同的线程每个写入 1000 条消息。
在消费者方面,有时我收到的消息比发送的消息多,我收到了 10867,而我只发送了 10000 条消息。
我注意到我得到了这些重复的信息,其中每个流都重新连接了以下消息。
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-29value:{"userId":"message-468","data":null,"data1":null,"data3":null}
**2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.KafkaConsumer:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-restore-consumer,groupId=]Unsubscribedalltopicsorpatternsandassignedpartitions
2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.s.p.i.StreamThread$RebalanceListener:stream-thread[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]partitionrevocationtook16ms.
suspendedactivetasks:[0_6]
suspendedstandbytasks:[]
2019-07-14T00:11:06,044INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.i.AbstractCoordinator:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-consumer,groupId=streams-user-process](Re-)joininggroup**
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}
我需要帮助才能理解为什么我在启用 exactly_once
的情况下仍会收到更多记录
Exactly once 用于流处理保证对于每个接收到的记录,其处理结果将被反映一次,即使在失败的情况下。
Exactly_once 在 Kafka 上下文中是一个适用于 "Kafka Streams" 的概念,请记住 Kafka Streams 旨在从主题读取并生成主题.
Kafka Streams 世界中的换句话说:Exactly once 意味着当且仅当状态相应更新并且输出记录成功生成一次时,任何输入记录的处理才被视为完成。
在您的具体情况下,您的日志 key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}
似乎是由拓扑的 peek
方法生成的。
如果您能找到预期的事件数,您应该查看接收器主题。
因为如果出于任何原因,您的 Kafka Streams 应用程序无法将消息发布到接收器主题,那么接收传入消息并再次处理以生成输出消息然后保证 "exactly once"合同。这就是为什么在您的日志中可以多次看到相同的消息。
您可以在 https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/
找到更多详细信息
我正在使用 kafka 流接收一些数据,我注意到它收到的记录比我发送的多,下面是我在消费者处的设置
消费者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-user-process");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettigs.getKafkaBroker());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaSettigs.getTotalStreamTHreadCounnt());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "600");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
制作方道具
Propertiesprops=newProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"mybootstarpservers");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"clientnoveluser");
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,1500))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,(newGenericSerializer<MyPojo>()).getClass().getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyRandom.class);
下面是我的生产者代码
public void producerSendData(String key, MyPojo message) throws Exception {
final Producer<String, MyPojo s> producer = myProducerInstance.createProducer();
final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", key,message);
try {
producer.send(record, new ProducerCallback());
producer.flush();
}
finally {
}
}
我的主题总共有 10 个分区,我的生产者使用 Round Robin 类型的分区逻辑并平等地写入所有分区,为了在生产者端进行测试,10 个不同的线程每个写入 1000 条消息。
在消费者方面,有时我收到的消息比发送的消息多,我收到了 10867,而我只发送了 10000 条消息。
我注意到我得到了这些重复的信息,其中每个流都重新连接了以下消息。
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-29value:{"userId":"message-468","data":null,"data1":null,"data3":null}
**2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.KafkaConsumer:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-restore-consumer,groupId=]Unsubscribedalltopicsorpatternsandassignedpartitions
2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.s.p.i.StreamThread$RebalanceListener:stream-thread[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]partitionrevocationtook16ms.
suspendedactivetasks:[0_6]
suspendedstandbytasks:[]
2019-07-14T00:11:06,044INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.i.AbstractCoordinator:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-consumer,groupId=streams-user-process](Re-)joininggroup**
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}
我需要帮助才能理解为什么我在启用 exactly_once
的情况下仍会收到更多记录Exactly once 用于流处理保证对于每个接收到的记录,其处理结果将被反映一次,即使在失败的情况下。
Exactly_once 在 Kafka 上下文中是一个适用于 "Kafka Streams" 的概念,请记住 Kafka Streams 旨在从主题读取并生成主题.
Kafka Streams 世界中的换句话说:Exactly once 意味着当且仅当状态相应更新并且输出记录成功生成一次时,任何输入记录的处理才被视为完成。
在您的具体情况下,您的日志 key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}
似乎是由拓扑的 peek
方法生成的。
如果您能找到预期的事件数,您应该查看接收器主题。
因为如果出于任何原因,您的 Kafka Streams 应用程序无法将消息发布到接收器主题,那么接收传入消息并再次处理以生成输出消息然后保证 "exactly once"合同。这就是为什么在您的日志中可以多次看到相同的消息。
您可以在 https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/
找到更多详细信息