什么时候使用 ConcurrentKafkaListenerContainerFactory?

When to use ConcurrentKafkaListenerContainerFactory?

我是 kafka 的新手,我经历了 documentation 但我什么都不懂。有人可以解释一下何时使用 ConcurrentKafkaListenerContainerFactory class 吗?我使用了 Kafkaconsumer class 但我看到 ConcurrentKafkaListenerContainerFactory 在我当前的项目中使用。请解释它的用途。

Kafka Consumer API 不是线程安全的。 ConcurrentKafkaListenerContainerFactory api 提供使用 Kafka Consumer API 以及设置其他 Kafka 消费者属性的并发方式。

Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。不同步访问将导致 ConcurrentModificationException.

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

Spring-kafka

ConcurrentKafkaListenerContainerFactory 用于为带有 @KafkaListener

的注释方法创建容器

kafkaspring中有两个MessageListenerContainer

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

使用 ConcurrentMessageListenerContainer

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

它有一个并发属性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 个实例。

If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

这是带有文档的清晰示例 here