如何从 kafka Stream 获取最新值
How to get the latest value from a kafka Stream
我对 Kafka 相当陌生,streaming.I 有一个要求,就像每次我 运行 kafka 生产者和消费者我应该得到生产者产生的唯一消息。
下面是生产者和消费者的基本代码
制作人
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("test", "key", jsonstring)
producer.send(record)
producer.close()
消费者
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "13")
val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
consumer.subscribe(util.Arrays.asList("test"))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator){
println(data.value())
}
我使用的输入Json如下
{
"id":1,
"Name":"foo"
}
现在我面临的问题是每次我 运行 程序我得到重复的 values.For 示例如果我 运行 代码两次消费者输出看起来像这样
{
"id":1,
"Name":"foo"
}
{
"id":1,
"Name":"foo"
}
我想要的输出就像我 运行 程序一样,生产者处理的唯一消息应该被消费并且应该被打印。
我尝试了一些方法,比如将消费者属性的偏移量更改为最新
props.put("auto.offset.reset", "latest")
我也试过下面提到的东西,但它对我不起作用
你能推荐一些替代方案吗??
Consumer 按顺序从主题分区读取消息。
如果你调用 poll(),它 returns 记录写入 Kafka,我们组中的消费者还没有读过。 Kafka 跟踪它们在每个分区上的消费偏移量,以了解在重启时从哪里开始消费。
消费者使用 commit.
在主题 __consumer_offsets 中维护他们的分区偏移量
Commit is the action of updating the current position in
__consumer_offsets.
如果消费者重新启动,为了知道从哪里开始消费,消费者将读取每个分区的最新提交的偏移量并从那里继续。
您可以通过两种方式控制提交,或者将自动提交设置为 true 和提交间隔
1.By enable.auto.commit 真
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
2.Manual 提交
consumer.commitAsync();//asyn commit
or
consumer.commitSync();//sync commit
如果您提交失败,它将从上次提交的位置重新开始,如下图所示
auto.offset.reset:
消费者第一次重启后,它使用auto.offset.reset来确定每个分配分区的初始位置。请注意,当组首次使用唯一组 ID 创建时,在使用任何消息之前,位置将根据可配置的偏移量重置策略设置 (auto.offset.reset)。之后,它将继续增量消费消息并使用提交(如上所述)来跟踪最新的消费消息
Note: If the consumer crashes before any offset has been committed,
then the consumer which takes over its partitions will use the reset
policy.
所以在你的情况下
- 使用手动偏移量提交或 enable.auto.commit true 自动提交。
- 如果您更改组,如果它将处理不同的消费者并使用 auto.offset.reset 分配偏移量,请始终使用相同的组 ID。
参考:https://www.confluent.io/resources/kafka-the-definitive-guide/
我对 Kafka 相当陌生,streaming.I 有一个要求,就像每次我 运行 kafka 生产者和消费者我应该得到生产者产生的唯一消息。
下面是生产者和消费者的基本代码
制作人
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("test", "key", jsonstring)
producer.send(record)
producer.close()
消费者
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "13")
val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
consumer.subscribe(util.Arrays.asList("test"))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator){
println(data.value())
}
我使用的输入Json如下
{
"id":1,
"Name":"foo"
}
现在我面临的问题是每次我 运行 程序我得到重复的 values.For 示例如果我 运行 代码两次消费者输出看起来像这样
{
"id":1,
"Name":"foo"
}
{
"id":1,
"Name":"foo"
}
我想要的输出就像我 运行 程序一样,生产者处理的唯一消息应该被消费并且应该被打印。
我尝试了一些方法,比如将消费者属性的偏移量更改为最新
props.put("auto.offset.reset", "latest")
我也试过下面提到的东西,但它对我不起作用
你能推荐一些替代方案吗??
Consumer 按顺序从主题分区读取消息。 如果你调用 poll(),它 returns 记录写入 Kafka,我们组中的消费者还没有读过。 Kafka 跟踪它们在每个分区上的消费偏移量,以了解在重启时从哪里开始消费。 消费者使用 commit.
在主题 __consumer_offsets 中维护他们的分区偏移量Commit is the action of updating the current position in __consumer_offsets.
如果消费者重新启动,为了知道从哪里开始消费,消费者将读取每个分区的最新提交的偏移量并从那里继续。
您可以通过两种方式控制提交,或者将自动提交设置为 true 和提交间隔
1.By enable.auto.commit 真
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
2.Manual 提交
consumer.commitAsync();//asyn commit
or
consumer.commitSync();//sync commit
如果您提交失败,它将从上次提交的位置重新开始,如下图所示
auto.offset.reset:
消费者第一次重启后,它使用auto.offset.reset来确定每个分配分区的初始位置。请注意,当组首次使用唯一组 ID 创建时,在使用任何消息之前,位置将根据可配置的偏移量重置策略设置 (auto.offset.reset)。之后,它将继续增量消费消息并使用提交(如上所述)来跟踪最新的消费消息
Note: If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.
所以在你的情况下
- 使用手动偏移量提交或 enable.auto.commit true 自动提交。
- 如果您更改组,如果它将处理不同的消费者并使用 auto.offset.reset 分配偏移量,请始终使用相同的组 ID。
参考:https://www.confluent.io/resources/kafka-the-definitive-guide/