Kafka 消费者 - 暂停从特定的 kafka 主题分区轮询事件以将其用作延迟队列
Kafka consumer- Pause polling of event from specific kafka topic partition to use it as delayed queue
我们的系统中有一个场景,其中 kafka 主题 XYZ 用户详细信息由其他一些生产应用程序 A(不同系统)发布,而我的应用程序 B 正在使用该主题。
要求应用程序 B 需要在 A 将事件放入 kafka 主题 XYZ 后 45 分钟(或任何可配置的时间)使用该事件(延迟的原因是某些系统 C 的另一个 REST api 需要根据特定用户的此用户详细信息事件触发,以确认它是否为该用户设置了一些标志,并且可以在 45 分钟的持续时间内的任何时间设置该标志,尽管如果 C 没有能力发布到 kafka 或以任何方式通知我们,它可能已经解决了)。
我们的应用B写在spring.
我尝试的解决方案是从 Kafka 获取事件并检查队列中第一个事件的时间戳,如果该事件已经 45 分钟则处理它,或者如果它少于 45 分钟则暂停轮询 kafka使用 MessageListnerContainer pause() 方法将容器放置该时间,直到达到 45 分钟。
如下所示 -
@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
public void delayedConsumer(@Payload String message,
Acknowledgment acknowledgment) {
UserDataEvent userDataEvent = null;
try {
userDataEvent = this.mapper.readValue(message, TopicRequest.class);
} catch (JsonProcessingException e) {
logger.error("error while parsing message");
}
MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
{
long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
// give negative ack to put already polled messages back to kafka topic
acknowledgment.nack(1000);
// pause container, and later resume it
delayedContainer.pause();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> {
delayedContainer.resume();
}, sleepTimeForPolling, TimeUnit.MILLISECONDS);
return;
}
// if message was already 45 minutes old then process it
this.service.processMessage(userDataEvent);
acknowledgment.acknowledge();
}
虽然它适用于单个分区,但我不确定这是否是正确的方法,对此有何评论?我还看到多个分区会导致问题,因为上面的暂停方法调用将暂停整个容器,如果其中一个分区有旧消息,如果容器因其他分区中的新消息而暂停,则不会被消耗。
我可以以某种方式在分区级别使用此暂停逻辑吗?
有什么 better/recommended 解决方案可以在一定的可配置时间后实现这种延迟处理,我可以在这种情况下采用,而不是像上面那样做?
Kafka 并不是真正为这种场景设计的。
我认为该技术可行的一种方法是将容器并发设置为与主题中的分区数相同,以便每个分区由不同线程上的不同消费者处理;然后 pause/resume 个人 Consumer<?, ?>
而不是整个容器。
为此,添加 Consumer<?, ?>
作为附加参数;要恢复消费者,请设置 idleEventInterval
并检查事件侦听器 (ListenerContainerIdleEvent
) 中的计时器。 Consumer<?, ?>
是事件的 属性,因此您可以在那里调用 resume()
。
我们的系统中有一个场景,其中 kafka 主题 XYZ 用户详细信息由其他一些生产应用程序 A(不同系统)发布,而我的应用程序 B 正在使用该主题。
要求应用程序 B 需要在 A 将事件放入 kafka 主题 XYZ 后 45 分钟(或任何可配置的时间)使用该事件(延迟的原因是某些系统 C 的另一个 REST api 需要根据特定用户的此用户详细信息事件触发,以确认它是否为该用户设置了一些标志,并且可以在 45 分钟的持续时间内的任何时间设置该标志,尽管如果 C 没有能力发布到 kafka 或以任何方式通知我们,它可能已经解决了)。
我们的应用B写在spring.
我尝试的解决方案是从 Kafka 获取事件并检查队列中第一个事件的时间戳,如果该事件已经 45 分钟则处理它,或者如果它少于 45 分钟则暂停轮询 kafka使用 MessageListnerContainer pause() 方法将容器放置该时间,直到达到 45 分钟。 如下所示 -
@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
public void delayedConsumer(@Payload String message,
Acknowledgment acknowledgment) {
UserDataEvent userDataEvent = null;
try {
userDataEvent = this.mapper.readValue(message, TopicRequest.class);
} catch (JsonProcessingException e) {
logger.error("error while parsing message");
}
MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
{
long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
// give negative ack to put already polled messages back to kafka topic
acknowledgment.nack(1000);
// pause container, and later resume it
delayedContainer.pause();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> {
delayedContainer.resume();
}, sleepTimeForPolling, TimeUnit.MILLISECONDS);
return;
}
// if message was already 45 minutes old then process it
this.service.processMessage(userDataEvent);
acknowledgment.acknowledge();
}
虽然它适用于单个分区,但我不确定这是否是正确的方法,对此有何评论?我还看到多个分区会导致问题,因为上面的暂停方法调用将暂停整个容器,如果其中一个分区有旧消息,如果容器因其他分区中的新消息而暂停,则不会被消耗。 我可以以某种方式在分区级别使用此暂停逻辑吗?
有什么 better/recommended 解决方案可以在一定的可配置时间后实现这种延迟处理,我可以在这种情况下采用,而不是像上面那样做?
Kafka 并不是真正为这种场景设计的。
我认为该技术可行的一种方法是将容器并发设置为与主题中的分区数相同,以便每个分区由不同线程上的不同消费者处理;然后 pause/resume 个人 Consumer<?, ?>
而不是整个容器。
为此,添加 Consumer<?, ?>
作为附加参数;要恢复消费者,请设置 idleEventInterval
并检查事件侦听器 (ListenerContainerIdleEvent
) 中的计时器。 Consumer<?, ?>
是事件的 属性,因此您可以在那里调用 resume()
。