Spring 中 Apache Kafka 的侦听器容器是什么?
What is a listener container in Spring for Apache Kafka?
我了解到要使一个方法成为Kafka消息监听器的目标,我必须用@KafkaListener注解来标记这个方法。
此注释允许通过 containerFactory 元素指定 KafkaListenerContainerFactory。
下面是 Baeldung Spring Kafka 教程的一些片段。
KafkaConsumerConfig.java
private ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaConsumerFactory<>(props);
}
private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
MessageListener.java
@KafkaListener(
topics = "${message.topic.name}",
groupId = "foo",
containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
我不明白的是为什么我们需要一个监听器容器工厂。什么是侦听器容器?当以这种方式注释方法时会发生什么?
侦听器“容器”是一个跨多种技术(JMS、RabbitMQ、Kafka、AWS 等)的Spring概念。
侦听器定义为 POJO bean 方法,是容器的 属性(容器“包含”侦听器)。
容器负责与代理交互以接收消息并根据侦听器类型对每条消息或一批消息调用您的侦听器方法。
这意味着您的应用程序代码不必处理与代理交互的机制,您可以只专注于您的业务逻辑。
框架发现所有 @KafkaListener
方法并使用工厂为每个方法创建一个容器。
我了解到要使一个方法成为Kafka消息监听器的目标,我必须用@KafkaListener注解来标记这个方法。 此注释允许通过 containerFactory 元素指定 KafkaListenerContainerFactory。
下面是 Baeldung Spring Kafka 教程的一些片段。
KafkaConsumerConfig.java
private ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaConsumerFactory<>(props);
}
private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
MessageListener.java
@KafkaListener(
topics = "${message.topic.name}",
groupId = "foo",
containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
我不明白的是为什么我们需要一个监听器容器工厂。什么是侦听器容器?当以这种方式注释方法时会发生什么?
侦听器“容器”是一个跨多种技术(JMS、RabbitMQ、Kafka、AWS 等)的Spring概念。
侦听器定义为 POJO bean 方法,是容器的 属性(容器“包含”侦听器)。
容器负责与代理交互以接收消息并根据侦听器类型对每条消息或一批消息调用您的侦听器方法。
这意味着您的应用程序代码不必处理与代理交互的机制,您可以只专注于您的业务逻辑。
框架发现所有 @KafkaListener
方法并使用工厂为每个方法创建一个容器。