Spring Kafka 的 MessageListener 在 MessageListenerContainer 停止后仍在消耗
Spring Kafka's MessageListener still consuming after MessageListenerContainer is stopped
我的目标是为 Spring Kafka 创建我自己的 Camel 组件。
我已经成功创建并开始使用了。我还希望能够停止组件和消耗(使用 JMX,使用其他 Camel 路由,...),而不丢失任何消息。
为此,当停止 Camel 组件时,我需要停止一个 MessageListenerContainer
并最终停止 MessageListener
,它在 MessageListenerContainer
.
中注册
我的问题是当 MessageListenerContainer
停止时,MessageListener
仍在处理消息。
@Override
protected void doStart() throws Exception {
super.doStart();
if (kafkaMessageListenerContainer != null) {
return;
}
kafkaMessageListenerContainer = kafkaListenerContainerFactory.createContainer(endpoint.getTopicName());
kafkaMessageListenerContainer.setupMessageListener(messageListener());
kafkaMessageListenerContainer.start();
}
@Override
protected void doStop() throws Exception {
LOG.info("STOPPING kafkaMessageListenerContainer");
kafkaMessageListenerContainer.stop();
LOG.info("STOPPED kafkaMessageListenerContainer");
super.doStop();
}
private MessageListener<Object, Object> messageListener() {
return new MessageListener<Object, Object>() {
@Override public void onMessage(ConsumerRecord<Object, Object> data) {
LOG.info("Record received: {}", data.offset());
//...pass a message to Camel processing route
LOG.info("Record processed: {}", data.offset());
}
};
}
这是日志中的片段
{"time":"2020-11-27T14:01:57.047Z","message":"Record received: 2051","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"c5efc5db-5981-4477-925a-83ffece49572"}
{"time":"2020-11-27T14:01:57.153Z","message":"STOPPED kafkaMessageListenerContainer","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"Camel (camelContext) thread #2 - ShutdownTask","level":"INFO"}
{"time":"2020-11-27T14:01:57.153Z","message":"Route: testTopic.consumer shutdown complete, was consuming from: my-kafka://events.TestTopic","logger":"org.apache.camel.impl.DefaultShutdownStrategy","thread-id":"Camel (camelContext) thread #2 - ShutdownTask","level":"INFO"}
{"time":"2020-11-27T14:01:57.159Z","message":"Record processed: 2051","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"8c835691-ba8d-43c2-b3e0-90a2f768ed7f"}
{"time":"2020-11-27T14:01:57.165Z","message":"Record received: 2052","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"8c835691-ba8d-43c2-b3e0-90a2f768ed7f"}
{"time":"2020-11-27T14:01:57.275Z","message":"Record processed: 2052","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"f7bcebb4-9e5e-46a1-bc5b-569264914b05"}
...
我希望 MessageListener
在 MessageListenerContainer
正常停止后不会再消耗。我一定是遗漏了什么,有什么建议吗?
非常感谢!
我发现了导致我遇到问题的问题。
出于某种原因,我覆盖了不正确的 consumerFactory。
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);
return new DefaultKafkaConsumerFactory<>(props);
}
删除它并使用 application.yml 中配置的默认设置后,问题已解决。
我的目标是为 Spring Kafka 创建我自己的 Camel 组件。
我已经成功创建并开始使用了。我还希望能够停止组件和消耗(使用 JMX,使用其他 Camel 路由,...),而不丢失任何消息。
为此,当停止 Camel 组件时,我需要停止一个 MessageListenerContainer
并最终停止 MessageListener
,它在 MessageListenerContainer
.
我的问题是当 MessageListenerContainer
停止时,MessageListener
仍在处理消息。
@Override
protected void doStart() throws Exception {
super.doStart();
if (kafkaMessageListenerContainer != null) {
return;
}
kafkaMessageListenerContainer = kafkaListenerContainerFactory.createContainer(endpoint.getTopicName());
kafkaMessageListenerContainer.setupMessageListener(messageListener());
kafkaMessageListenerContainer.start();
}
@Override
protected void doStop() throws Exception {
LOG.info("STOPPING kafkaMessageListenerContainer");
kafkaMessageListenerContainer.stop();
LOG.info("STOPPED kafkaMessageListenerContainer");
super.doStop();
}
private MessageListener<Object, Object> messageListener() {
return new MessageListener<Object, Object>() {
@Override public void onMessage(ConsumerRecord<Object, Object> data) {
LOG.info("Record received: {}", data.offset());
//...pass a message to Camel processing route
LOG.info("Record processed: {}", data.offset());
}
};
}
这是日志中的片段
{"time":"2020-11-27T14:01:57.047Z","message":"Record received: 2051","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"c5efc5db-5981-4477-925a-83ffece49572"}
{"time":"2020-11-27T14:01:57.153Z","message":"STOPPED kafkaMessageListenerContainer","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"Camel (camelContext) thread #2 - ShutdownTask","level":"INFO"}
{"time":"2020-11-27T14:01:57.153Z","message":"Route: testTopic.consumer shutdown complete, was consuming from: my-kafka://events.TestTopic","logger":"org.apache.camel.impl.DefaultShutdownStrategy","thread-id":"Camel (camelContext) thread #2 - ShutdownTask","level":"INFO"}
{"time":"2020-11-27T14:01:57.159Z","message":"Record processed: 2051","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"8c835691-ba8d-43c2-b3e0-90a2f768ed7f"}
{"time":"2020-11-27T14:01:57.165Z","message":"Record received: 2052","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"8c835691-ba8d-43c2-b3e0-90a2f768ed7f"}
{"time":"2020-11-27T14:01:57.275Z","message":"Record processed: 2052","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"f7bcebb4-9e5e-46a1-bc5b-569264914b05"}
...
我希望 MessageListener
在 MessageListenerContainer
正常停止后不会再消耗。我一定是遗漏了什么,有什么建议吗?
非常感谢!
我发现了导致我遇到问题的问题。
出于某种原因,我覆盖了不正确的 consumerFactory。
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);
return new DefaultKafkaConsumerFactory<>(props);
}
删除它并使用 application.yml 中配置的默认设置后,问题已解决。