Spring Kafka 和主题消费者数量
Spring Kafka and number of topic consumers
在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setConcurrency(10);
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
}
这是我的 PostConsumer
:
@Component
public class PostConsumer {
@Autowired
private PostService postService;
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
和 application.properties:
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error
如您所见,我添加了 factory.setConcurrency(10); 但它不起作用。所有 PostConsumer.sendPost
在名称为 org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1
的同一个线程中执行
我希望能够控制并发 PostConsumer.sendPost
侦听器的数量以便并行工作。请告诉我如何使用 Spring Boot 和 Spring Kafka 实现它。
这里的问题在于我们使用 Apache Kafka Consumer 在 Spring Kafka 中追求的一致性。这样的并发分布在提供的主题中 partitions 之间。如果你只有一个主题和一个分区,那么确实不会有任何并发。重点是在同一个线程中消耗一个分区中的所有记录。
文档中有一些关于此事的信息:https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#_concurrentmessagelistenercontainer
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
还有 JavaDocs:
/**
* The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
* Messages from within the same partition will be processed sequentially.
* @param concurrency the concurrency.
*/
public void setConcurrency(int concurrency) {
要创建和管理分区主题,
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicToTarget() {
return new NewTopic(Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME, <no. of partitions>, (short) <replication factor>);
}
要将消息发送到不同的分区,请使用分区程序接口
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, <your custom Partitioner implementation>);
return props;
}
使用单个消费者消费来自多个分区的消息(来自不同分区的每条消息将产生新线程,消费者方法将被并行调用)
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME,
partitions = "#{kafkaGateway.createPartitionArray()}"
)
}, groupId = "group.processor")
public void consumeWriteRequest(@Payload String data) {
//your code
}
这里的消费者(如果启动了多个实例)属于同一组,因此每条消息都会调用其中一个。
在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setConcurrency(10);
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
}
这是我的 PostConsumer
:
@Component
public class PostConsumer {
@Autowired
private PostService postService;
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
和 application.properties:
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error
如您所见,我添加了 factory.setConcurrency(10); 但它不起作用。所有 PostConsumer.sendPost
在名称为 org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1
我希望能够控制并发 PostConsumer.sendPost
侦听器的数量以便并行工作。请告诉我如何使用 Spring Boot 和 Spring Kafka 实现它。
这里的问题在于我们使用 Apache Kafka Consumer 在 Spring Kafka 中追求的一致性。这样的并发分布在提供的主题中 partitions 之间。如果你只有一个主题和一个分区,那么确实不会有任何并发。重点是在同一个线程中消耗一个分区中的所有记录。
文档中有一些关于此事的信息:https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#_concurrentmessagelistenercontainer
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
还有 JavaDocs:
/**
* The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
* Messages from within the same partition will be processed sequentially.
* @param concurrency the concurrency.
*/
public void setConcurrency(int concurrency) {
要创建和管理分区主题,
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicToTarget() {
return new NewTopic(Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME, <no. of partitions>, (short) <replication factor>);
}
要将消息发送到不同的分区,请使用分区程序接口
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, <your custom Partitioner implementation>);
return props;
}
使用单个消费者消费来自多个分区的消息(来自不同分区的每条消息将产生新线程,消费者方法将被并行调用)
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME,
partitions = "#{kafkaGateway.createPartitionArray()}"
)
}, groupId = "group.processor")
public void consumeWriteRequest(@Payload String data) {
//your code
}
这里的消费者(如果启动了多个实例)属于同一组,因此每条消息都会调用其中一个。