Spring 集成 Kafka 手动确认
Spring Integration Kafka Manual Acknowledgment
我在使用 KafkaTopicOffsetManager 手动确认时遇到问题。当调用 acknowledge() 时,主题开始反复发送垃圾邮件。 Kafka 已将 log.cleaner.enable 设置为 true,主题使用 cleanup.policy=compact。感谢您的帮助。
配置:
@Bean
public ZookeeperConfiguration zookeeperConfiguration() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(kafkaConfig.getZookeeperAddress());
zookeeperConfiguration.setClientId("clientId");
return zookeeperConfiguration;
}
@Bean
public ConnectionFactory connectionFactory() {
return new DefaultConnectionFactory(zookeeperConfiguration());
}
@Bean
public TestMessageHandler messageListener() {
return new TestMessageHandler();
}
@Bean
public OffsetManager offsetManager() {
ZookeeperConnect zookeeperConnect = new ZookeeperConnect(kafkaConfig.getZookeeperAddress());
OffsetManager offsetManager = new KafkaTopicOffsetManager(zookeeperConnect, kafkaConfig.getTopic() + "_OFFSET");
return offsetManager;
}
@Bean
public KafkaMessageListenerContainer kafkaMessageListenerContainer() {
KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(connectionFactory(), kafkaConfig.getTopic());
kafkaMessageListenerContainer.setMessageListener(messageListener());
kafkaMessageListenerContainer.setOffsetManager(offsetManager());
return kafkaMessageListenerContainer;
}
听众:
public class TestMessageHandler implements AcknowledgingMessageListener {
private static final Logger logger = LoggerFactory.getLogger(TestMessageHandler.class);
@Override
public void onMessage(KafkaMessage message, Acknowledgment acknowledgment) {
logger.info(message.toString());
acknowledgment.acknowledge();
}
}
KafkaTopicOffsetManager 需要自己的主题来维护实际消费主题的偏移量。
如果您不想自己解码消息负载(在我看来这很痛苦),请从抽象 class AbstractDecodingAcknowledgingMessageListener 扩展监听器并提供 org.springframework.integration.kafka.serializer.common.StringDecoder 作为解码器。
public class TestMessageHandlerDecoding extends AbstractDecodingAcknowledgingMessageListener {
public TestMessageHandlerDecoding(Decoder keyDecoder, Decoder payloadDecoder) {
super(keyDecoder, payloadDecoder);
}
@Override
public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
LOGGER.info("payload={}",payload);
}
我在使用 KafkaTopicOffsetManager 手动确认时遇到问题。当调用 acknowledge() 时,主题开始反复发送垃圾邮件。 Kafka 已将 log.cleaner.enable 设置为 true,主题使用 cleanup.policy=compact。感谢您的帮助。
配置:
@Bean
public ZookeeperConfiguration zookeeperConfiguration() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(kafkaConfig.getZookeeperAddress());
zookeeperConfiguration.setClientId("clientId");
return zookeeperConfiguration;
}
@Bean
public ConnectionFactory connectionFactory() {
return new DefaultConnectionFactory(zookeeperConfiguration());
}
@Bean
public TestMessageHandler messageListener() {
return new TestMessageHandler();
}
@Bean
public OffsetManager offsetManager() {
ZookeeperConnect zookeeperConnect = new ZookeeperConnect(kafkaConfig.getZookeeperAddress());
OffsetManager offsetManager = new KafkaTopicOffsetManager(zookeeperConnect, kafkaConfig.getTopic() + "_OFFSET");
return offsetManager;
}
@Bean
public KafkaMessageListenerContainer kafkaMessageListenerContainer() {
KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(connectionFactory(), kafkaConfig.getTopic());
kafkaMessageListenerContainer.setMessageListener(messageListener());
kafkaMessageListenerContainer.setOffsetManager(offsetManager());
return kafkaMessageListenerContainer;
}
听众:
public class TestMessageHandler implements AcknowledgingMessageListener {
private static final Logger logger = LoggerFactory.getLogger(TestMessageHandler.class);
@Override
public void onMessage(KafkaMessage message, Acknowledgment acknowledgment) {
logger.info(message.toString());
acknowledgment.acknowledge();
}
}
KafkaTopicOffsetManager 需要自己的主题来维护实际消费主题的偏移量。
如果您不想自己解码消息负载(在我看来这很痛苦),请从抽象 class AbstractDecodingAcknowledgingMessageListener 扩展监听器并提供 org.springframework.integration.kafka.serializer.common.StringDecoder 作为解码器。
public class TestMessageHandlerDecoding extends AbstractDecodingAcknowledgingMessageListener {
public TestMessageHandlerDecoding(Decoder keyDecoder, Decoder payloadDecoder) {
super(keyDecoder, payloadDecoder);
}
@Override
public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
LOGGER.info("payload={}",payload);
}