如何使用 spring-kafka 从给定主题和分区以特定偏移量重新发送(读取)旧的 kafka 消息?

How to re-send (read) an old kafka message from given topic and partition at specific offset using spring-kafka?

给定主题名称、分区号和偏移量,如何从主题中只读取一条记录?

在我基于 Sprng Boot 的应用程序中,我使用 Kafka 导入业务数据。 导入记录被发送到 import_queue 并被一个或多个业务模块使用。即使消费者未能从记录中导入数据以继续从以下记录中导入数据,记录也始终得到确认。

稍后用户(在 he/she 修复了一些相关业务数据之后)可以决定重新发送一个或多个失败(但已确认)的导入记录。

每条记录的偏移量、分区号和主题名称都存储在我的应用程序内部的 SQL 数据库中。

根据参考文档和一些 Whosebug 问题,我发现我必须:

  1. 设置一个容器(consumer/listener)
  2. 倒回(寻找)到所需的偏移量
  3. 读取一条记录
  4. 跳过读取剩余记录

这是从 kafka 主题中只读取一条旧记录的唯一方法吗? 或者有更简单的解决方案吗?

解决方案

根据@Gary 的建议:

ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
    Map<String, Object> configs = Map.of(
            "bootstrap.servers", "localhost:9092",
            "group.id", "incubator_retry",
            "max.poll.records", 1);
    DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
            configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());

    try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        consumer.assign(List.of(topicPartition));
        consumer.seek(topicPartition, offset);
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
        if (consumerRecords.isEmpty()) {
            throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
                    topicPartition.topic(), topicPartition.partition(), offset));
        }
        return consumerRecords.iterator().next();
    }
}

有一个更简单的解决方案。

  • 使用 DefaultConsumerFactory 创建一个 KafkaConsumer(或简单地创建一个)
  • 使用不同的group.id
  • max.poll.records 属性 设置为 1
  • consumer.assign(...)想要的topic/partition
  • seek(...) 到所需的偏移量
  • poll(...)直到你获得记录
  • close()消费者

如果您正在使用任何消息转换(除了 Kafka 反序列化器),您将必须手动调用转换器。