通过外部触发器寻找偏移量
Seek to an offset via an external trigger
目前我使用 AcknoledgingMessageListener 来实现使用 spring-Kafka 的 Kafka 消费者。此实现帮助我收听特定主题并使用手动确认处理消息。
我现在需要构建以下功能:
让我们假设,对于一些环境异常或通过该主题输入的一些不良数据,我需要从特定偏移量重播某个主题的数据。这将是一个手动触发器(主要是通过执行 Java class)。
如果我可以检索这些偏移量之间的消息并将其作为重放主题提供给它,以便新的消费者可以处理这些消息,从而保持原始主题的偏移量不变,那将是理想的。
- CosumerSeekAware 界面 - 如果这是答案,我如何从外部触发它?通过让我们说一个mvn -Dexec。我不确定这是否可能
- 另外假设我有一个崩溃时间戳,是否可以反省主题以找到与崩溃对应的偏移量,以便我可以从该偏移量重播?
- 我可以找到与某些特定数据对应的偏移量,以便我可以重播那些特定的偏移量吗?
所有这些要求都是为了围绕我们的 Kafka 功能构建弹性层。我需要所有这些都由一个单独的可执行文件 class 管理,可以手动触发提供相关数据(如时间戳等)。 class 应确定偏移量,然后寻找该偏移量,检索与这些偏移量对应的消息,并将它们 post 到一个单独的主题。有人可以指出我正确的方向吗?我怕是在兜圈子
so that a new consumer can process those messages thus keeping the offsets intact on the original topic.
只需创建一个具有不同组 ID(新消费者)的新侦听器容器,并在分配分区时使用 ConsumerAwareRebalanceListener
(或 ConsumerSeekAware
)执行查找。
您将需要某种机制来了解新消费者何时应停止消费(此时您可以 stop()
新容器)。也许在新消费者上设置 max.poll.records=1
这样他就不会预取超过失败点。
我不确定你所说的 #3 是什么意思。
目前我使用 AcknoledgingMessageListener 来实现使用 spring-Kafka 的 Kafka 消费者。此实现帮助我收听特定主题并使用手动确认处理消息。 我现在需要构建以下功能: 让我们假设,对于一些环境异常或通过该主题输入的一些不良数据,我需要从特定偏移量重播某个主题的数据。这将是一个手动触发器(主要是通过执行 Java class)。
如果我可以检索这些偏移量之间的消息并将其作为重放主题提供给它,以便新的消费者可以处理这些消息,从而保持原始主题的偏移量不变,那将是理想的。
- CosumerSeekAware 界面 - 如果这是答案,我如何从外部触发它?通过让我们说一个mvn -Dexec。我不确定这是否可能
- 另外假设我有一个崩溃时间戳,是否可以反省主题以找到与崩溃对应的偏移量,以便我可以从该偏移量重播?
- 我可以找到与某些特定数据对应的偏移量,以便我可以重播那些特定的偏移量吗?
所有这些要求都是为了围绕我们的 Kafka 功能构建弹性层。我需要所有这些都由一个单独的可执行文件 class 管理,可以手动触发提供相关数据(如时间戳等)。 class 应确定偏移量,然后寻找该偏移量,检索与这些偏移量对应的消息,并将它们 post 到一个单独的主题。有人可以指出我正确的方向吗?我怕是在兜圈子
so that a new consumer can process those messages thus keeping the offsets intact on the original topic.
只需创建一个具有不同组 ID(新消费者)的新侦听器容器,并在分配分区时使用 ConsumerAwareRebalanceListener
(或 ConsumerSeekAware
)执行查找。
您将需要某种机制来了解新消费者何时应停止消费(此时您可以 stop()
新容器)。也许在新消费者上设置 max.poll.records=1
这样他就不会预取超过失败点。
我不确定你所说的 #3 是什么意思。