如何使用 spring-kafka 从给定主题和分区以特定偏移量重新发送(读取)旧的 kafka 消息?
How to re-send (read) an old kafka message from given topic and partition at specific offset using spring-kafka?
给定主题名称、分区号和偏移量,如何从主题中只读取一条记录?
在我基于 Sprng Boot 的应用程序中,我使用 Kafka 导入业务数据。
导入记录被发送到 import_queue 并被一个或多个业务模块使用。即使消费者未能从记录中导入数据以继续从以下记录中导入数据,记录也始终得到确认。
稍后用户(在 he/she 修复了一些相关业务数据之后)可以决定重新发送一个或多个失败(但已确认)的导入记录。
每条记录的偏移量、分区号和主题名称都存储在我的应用程序内部的 SQL 数据库中。
根据参考文档和一些 Whosebug 问题,我发现我必须:
- 设置一个容器(consumer/listener)
- 倒回(寻找)到所需的偏移量
- 读取一条记录
- 跳过读取剩余记录
这是从 kafka 主题中只读取一条旧记录的唯一方法吗?
或者有更简单的解决方案吗?
解决方案
根据@Gary 的建议:
ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
Map<String, Object> configs = Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "incubator_retry",
"max.poll.records", 1);
DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
if (consumerRecords.isEmpty()) {
throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
topicPartition.topic(), topicPartition.partition(), offset));
}
return consumerRecords.iterator().next();
}
}
有一个更简单的解决方案。
- 使用
DefaultConsumerFactory
创建一个 KafkaConsumer
(或简单地创建一个)
- 使用不同的
group.id
- 将
max.poll.records
属性 设置为 1
consumer.assign(...)
想要的topic/partition
seek(...)
到所需的偏移量
poll(...)
直到你获得记录
close()
消费者
如果您正在使用任何消息转换(除了 Kafka 反序列化器),您将必须手动调用转换器。
给定主题名称、分区号和偏移量,如何从主题中只读取一条记录?
在我基于 Sprng Boot 的应用程序中,我使用 Kafka 导入业务数据。 导入记录被发送到 import_queue 并被一个或多个业务模块使用。即使消费者未能从记录中导入数据以继续从以下记录中导入数据,记录也始终得到确认。
稍后用户(在 he/she 修复了一些相关业务数据之后)可以决定重新发送一个或多个失败(但已确认)的导入记录。
每条记录的偏移量、分区号和主题名称都存储在我的应用程序内部的 SQL 数据库中。
根据参考文档和一些 Whosebug 问题,我发现我必须:
- 设置一个容器(consumer/listener)
- 倒回(寻找)到所需的偏移量
- 读取一条记录
- 跳过读取剩余记录
这是从 kafka 主题中只读取一条旧记录的唯一方法吗? 或者有更简单的解决方案吗?
解决方案
根据@Gary 的建议:
ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
Map<String, Object> configs = Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "incubator_retry",
"max.poll.records", 1);
DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
if (consumerRecords.isEmpty()) {
throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
topicPartition.topic(), topicPartition.partition(), offset));
}
return consumerRecords.iterator().next();
}
}
有一个更简单的解决方案。
- 使用
DefaultConsumerFactory
创建一个KafkaConsumer
(或简单地创建一个) - 使用不同的
group.id
- 将
max.poll.records
属性 设置为 1 consumer.assign(...)
想要的topic/partitionseek(...)
到所需的偏移量poll(...)
直到你获得记录close()
消费者
如果您正在使用任何消息转换(除了 Kafka 反序列化器),您将必须手动调用转换器。