什么时候使用 ConcurrentKafkaListenerContainerFactory?
When to use ConcurrentKafkaListenerContainerFactory?
我是 kafka 的新手,我经历了 documentation 但我什么都不懂。有人可以解释一下何时使用 ConcurrentKafkaListenerContainerFactory
class 吗?我使用了 Kafkaconsumer
class 但我看到 ConcurrentKafkaListenerContainerFactory
在我当前的项目中使用。请解释它的用途。
Kafka Consumer API 不是线程安全的。 ConcurrentKafkaListenerContainerFactory api 提供使用 Kafka Consumer API 以及设置其他 Kafka 消费者属性的并发方式。
Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。不同步访问将导致 ConcurrentModificationException.
If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.
Spring-kafka
ConcurrentKafkaListenerContainerFactory
用于为带有 @KafkaListener
的注释方法创建容器
kafkaspring中有两个MessageListenerContainer
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer
delegates to one or more KafkaMessageListenerContainer
instances to provide multi-threaded consumption.
使用 ConcurrentMessageListenerContainer
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
它有一个并发属性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer
个实例。
If you have six TopicPartition
instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.
这是带有文档的清晰示例 here
我是 kafka 的新手,我经历了 documentation 但我什么都不懂。有人可以解释一下何时使用 ConcurrentKafkaListenerContainerFactory
class 吗?我使用了 Kafkaconsumer
class 但我看到 ConcurrentKafkaListenerContainerFactory
在我当前的项目中使用。请解释它的用途。
Kafka Consumer API 不是线程安全的。 ConcurrentKafkaListenerContainerFactory api 提供使用 Kafka Consumer API 以及设置其他 Kafka 消费者属性的并发方式。
Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。不同步访问将导致 ConcurrentModificationException.
If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.
Spring-kafka
ConcurrentKafkaListenerContainerFactory
用于为带有 @KafkaListener
kafkaspring中有两个MessageListenerContainer
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The
KafkaMessageListenerContainer
receives all message from all topics or partitions on a single thread. TheConcurrentMessageListenerContainer
delegates to one or moreKafkaMessageListenerContainer
instances to provide multi-threaded consumption.
使用 ConcurrentMessageListenerContainer
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
它有一个并发属性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer
个实例。
If you have six
TopicPartition
instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.
这是带有文档的清晰示例 here