Kafka Spring 消费者偏移量未使用 ConsumerRecordRecoverer 提交
Kafka Spring consumer offsets are not committed with ConsumerRecordRecoverer
技术详情:
版本:
spring-boot : 2.2.2.RELEASE
spring-kafka : 2.3.7.RELEASE
kafka broker : 2.3.1 (via amazon MSK)
道具:
auto.offset.reset: earliest
enable.auto.commit: false
isolation.level: read_committed
问题和行为:
我有一个 KafkaListener
使用 ConcurrentKafkaListenerContainerFactory
配置了 ConsumerRecordRecoverer
的自定义实现。我注意到,当此容器确实从某些异常中恢复时,未提交所述已恢复消息的消费者偏移量。只有在成功处理消息(即没有恢复)时才会提交偏移量。然而,listener/consumer/container 似乎确实在内存中保留了真实的偏移量,因为在该应用程序保持 运行 的同时将超过恢复的消息。
如果 spring 引导应用程序在最后一条消息未被成功处理时重新启动,并且将从实际提交的最后一个偏移量恢复,可能会重新处理已恢复的消息,这将导致问题但其偏移量未提交。
我通过一个空主题的本地测试证实了这一点。
- 之前:kafka 中分区 0 的消费者组偏移量为 0
- 推送导致侦听器异常和恢复的消息。
- 修改后:分区 0 的消费者组偏移量保持为 0,现在有延迟。
在这一点上,我假设我在 spring 工件上缺少一些关键配置或 setter,但我不清楚缺少什么。我原以为这就是使用 DefaultAfterRollbackProcessor#setCommitRecovered
到 true
的目的。
代码示例
KafkaConfiguration
@Configuration
public class KafkaConfig {
@Bean
ConsumerRetryConfig retryConfig() {
return new ConsumerRetryConfig();
}
@Bean
public RetryTemplate consumerRetryTemplate(ConsumerRetryConfig consumerRetryConfig) {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(consumerRetryConfig.getRetryWaitInterval());
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(consumerRetryConfig.getMaxRetries());
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
@Lazy
FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig() {
return new FiniteRequeueingRecovererConfig();
}
@Bean
@Lazy
FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer(
KafkaTemplate<String, SpecificRecord> kafkaTemplate,
FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig
) {
return new FiniteRequeueingRecordRecoverer(kafkaTemplate, finiteRequeueingRecovererConfig.getMaxRequeues());
}
@Bean
@Lazy
DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor(
FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer,
ConsumerRetryConfig consumerRetryConfig
) {
DefaultAfterRollbackProcessor ret = new DefaultAfterRollbackProcessor(
finiteRequeueingRecordRecoverer,
new FixedBackOff(
consumerRetryConfig.getRetryWaitInterval(),
consumerRetryConfig.getMaxRetries()
)
);
ret.setCommitRecovered(true);
return ret;
}
@Bean
public ProducerFactory<String, SpecificRecord> avroMessageProducerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> props = MapBuilder.<String, Object>builder()
.putAll(kafkaProperties.buildProducerProperties())
.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString())
.build();
return (kafkaAvroSerializer==null) ?
new DefaultKafkaProducerFactory<>(props) :
new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer);
}
@Bean
public KafkaTemplate<String, SpecificRecord> avroMessageKafkaTemplate(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
return new KafkaTemplate<>(avroMessageProducerFactory);
}
@Bean
public KafkaTransactionManager<?,?> kafkaTransactionManager(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
return new KafkaTransactionManager<>(avroMessageProducerFactory);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> finiteRequeueingKafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTransactionManager<Object, Object> kafkaTransactionManager,
DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor
) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.setStatefulRetry(true);
factory.setAfterRollbackProcessor(finiteRequeueingRollbackProcessor);
return factory;
}
@KafkaListener(
id = "${some.listener-id}",
topics = "${some.topic}",
groupId = "${some.group-id}",
containerFactory = "finiteRequeueingKafkaListenerContainerFactory"
)
public void consume(
@Payload WebhookNotificationMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) throws Exception {
// Do the thing, maybe throw an exception
}
}
FiniteRequeueingRecordRecoverer
public class FiniteRequeueingRecordRecoverer implements ConsumerRecordRecoverer {
private final Logger logger = LoggerLike.getLogger(FiniteRequeueingRecordRecoverer.class);
private KafkaTemplate<String, SpecificRecord> kafkaTemplate;
private Integer maxRequeues;
public FiniteRequeueingRecordRecoverer(KafkaTemplate<String, SpecificRecord> kafkaTemplate, Integer maxRequeues) {
this.kafkaTemplate = kafkaTemplate;
this.maxRequeues = maxRequeues;
}
@Override
public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
// Not sure the substance of this recoverer is relevant...but if so
// If the retry number in the avro record is < this.maxRequeues
// then increment the retries and re enqueue this message, move on
// If retries have been exhausted, do not requeue and send to a dead letter or just abandon
}
}
DefaultAfterRollbackProcessor
需要 KafkaTemplate
将偏移量发送到新事务。
如果 commitRecovered
为真且没有 KT,我们可能应该记录警告。
技术详情:
版本:
spring-boot : 2.2.2.RELEASE
spring-kafka : 2.3.7.RELEASE
kafka broker : 2.3.1 (via amazon MSK)
道具:
auto.offset.reset: earliest
enable.auto.commit: false
isolation.level: read_committed
问题和行为:
我有一个 KafkaListener
使用 ConcurrentKafkaListenerContainerFactory
配置了 ConsumerRecordRecoverer
的自定义实现。我注意到,当此容器确实从某些异常中恢复时,未提交所述已恢复消息的消费者偏移量。只有在成功处理消息(即没有恢复)时才会提交偏移量。然而,listener/consumer/container 似乎确实在内存中保留了真实的偏移量,因为在该应用程序保持 运行 的同时将超过恢复的消息。
如果 spring 引导应用程序在最后一条消息未被成功处理时重新启动,并且将从实际提交的最后一个偏移量恢复,可能会重新处理已恢复的消息,这将导致问题但其偏移量未提交。
我通过一个空主题的本地测试证实了这一点。
- 之前:kafka 中分区 0 的消费者组偏移量为 0
- 推送导致侦听器异常和恢复的消息。
- 修改后:分区 0 的消费者组偏移量保持为 0,现在有延迟。
在这一点上,我假设我在 spring 工件上缺少一些关键配置或 setter,但我不清楚缺少什么。我原以为这就是使用 DefaultAfterRollbackProcessor#setCommitRecovered
到 true
的目的。
代码示例
KafkaConfiguration
@Configuration
public class KafkaConfig {
@Bean
ConsumerRetryConfig retryConfig() {
return new ConsumerRetryConfig();
}
@Bean
public RetryTemplate consumerRetryTemplate(ConsumerRetryConfig consumerRetryConfig) {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(consumerRetryConfig.getRetryWaitInterval());
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(consumerRetryConfig.getMaxRetries());
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
@Lazy
FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig() {
return new FiniteRequeueingRecovererConfig();
}
@Bean
@Lazy
FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer(
KafkaTemplate<String, SpecificRecord> kafkaTemplate,
FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig
) {
return new FiniteRequeueingRecordRecoverer(kafkaTemplate, finiteRequeueingRecovererConfig.getMaxRequeues());
}
@Bean
@Lazy
DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor(
FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer,
ConsumerRetryConfig consumerRetryConfig
) {
DefaultAfterRollbackProcessor ret = new DefaultAfterRollbackProcessor(
finiteRequeueingRecordRecoverer,
new FixedBackOff(
consumerRetryConfig.getRetryWaitInterval(),
consumerRetryConfig.getMaxRetries()
)
);
ret.setCommitRecovered(true);
return ret;
}
@Bean
public ProducerFactory<String, SpecificRecord> avroMessageProducerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> props = MapBuilder.<String, Object>builder()
.putAll(kafkaProperties.buildProducerProperties())
.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString())
.build();
return (kafkaAvroSerializer==null) ?
new DefaultKafkaProducerFactory<>(props) :
new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer);
}
@Bean
public KafkaTemplate<String, SpecificRecord> avroMessageKafkaTemplate(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
return new KafkaTemplate<>(avroMessageProducerFactory);
}
@Bean
public KafkaTransactionManager<?,?> kafkaTransactionManager(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
return new KafkaTransactionManager<>(avroMessageProducerFactory);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> finiteRequeueingKafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTransactionManager<Object, Object> kafkaTransactionManager,
DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor
) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.setStatefulRetry(true);
factory.setAfterRollbackProcessor(finiteRequeueingRollbackProcessor);
return factory;
}
@KafkaListener(
id = "${some.listener-id}",
topics = "${some.topic}",
groupId = "${some.group-id}",
containerFactory = "finiteRequeueingKafkaListenerContainerFactory"
)
public void consume(
@Payload WebhookNotificationMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) throws Exception {
// Do the thing, maybe throw an exception
}
}
FiniteRequeueingRecordRecoverer
public class FiniteRequeueingRecordRecoverer implements ConsumerRecordRecoverer {
private final Logger logger = LoggerLike.getLogger(FiniteRequeueingRecordRecoverer.class);
private KafkaTemplate<String, SpecificRecord> kafkaTemplate;
private Integer maxRequeues;
public FiniteRequeueingRecordRecoverer(KafkaTemplate<String, SpecificRecord> kafkaTemplate, Integer maxRequeues) {
this.kafkaTemplate = kafkaTemplate;
this.maxRequeues = maxRequeues;
}
@Override
public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
// Not sure the substance of this recoverer is relevant...but if so
// If the retry number in the avro record is < this.maxRequeues
// then increment the retries and re enqueue this message, move on
// If retries have been exhausted, do not requeue and send to a dead letter or just abandon
}
}
DefaultAfterRollbackProcessor
需要 KafkaTemplate
将偏移量发送到新事务。
如果 commitRecovered
为真且没有 KT,我们可能应该记录警告。