如何在不同线程中处理@KafkaListener 方法?

How can I process @KafkaListener method in different threads?

我在 spring 引导中有 kafka 处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

例如生产者每秒发送一条消息。但是 myService.processResponse 工作 10 秒。我需要处理每条消息并在新线程中启动 myService.processResponse。我可以创建我的执行者并将每个响应委托给它。但我认为 kafka 中还有其他配置。我找到了 2:

1) 将 concurrency = "5" 添加到 @KafkaListener 注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方式:

2) 我可以创建 ConcurrentKafkaListenerContainerFactory 并设置为 ConsumerFactoryconcurrency

我不明白这些方法的区别?仅将 concurrency = "5" 添加到 @KafkaListener 注释是否足够,或者我需要创建 ConcurrentKafkaListenerContainerFactory

或者我什么都不懂,还有别的办法吗?

在管理提交的偏移量方面,使用执行程序会使事情变得复杂;不推荐。

使用 @KafkaListener,框架会为您创建 ConcurrentKafkaListenerContainerFactory

注释上的

concurrency只是一种方便;它会覆盖出厂设置。

这允许您对多个侦听器使用同一个工厂,每个侦听器具有不同的并发性。

您可以使用引导设置容器并发(默认)属性;该值被注释值覆盖;查看 javadocs...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

concurrency 选项与并发处理同一消费者收到的消息无关。当您有多个消费者,每个消费者处理自己的分区时,它适用于消费者群体。

将处理传递给单独的线程非常复杂,Spring-Kafka 团队决定不这样做 "by design",我相信。您甚至不需要深入研究 Spring-Kafka 就能理解其中的原因。检查 KafkaConsumer's 检测消费者故障 文档:

Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.