Kafka - 未调用重试和恢复
Kafka - Retries and Recovery not invoked
我已经实现了一个自定义侦听器,即不使用 @KafkaListener 注释,因为我的应用程序需要动态监听主题。我看到了迁移到 Spring kafka 2.6.x 的建议,但由于我被 Spring 5.[=] 困住了(至少现在),所以我无法升级27=] 这意味着我只能使用 Spring-kafka 2.2.x.
我的问题是,如何使用Spring-kafka 2.2.x实现重试、恢复和错误处理?
ConcurrentKafkaListenerContainerFactory listenerFactory = new ConcurrentKafkaListenerContainerFactory();
listenerFactory.setConsumerFactory(consumerFactory);
configurer.configure(listenerFactory, consumerFactory);
listenerFactory.setConcurrency(listenerConcurrency);
listenerFactory.setStatefulRetry(Boolean.TRUE);
listenerFactory.setBatchListener(isBatchListener);
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
listenerFactory.getContainerProperties().setAckOnError(false);
listenerFactory.setRetryTemplate(retryTemplate(kafkaEhCacheRetryManager));
listenerFactory.setRecoveryCallback(kafkaRecoverer);
我的重试模板如下所示:
RetryTemplate retryTemplate(EhCacheCacheManager kafkaEhCacheRetryManager) {
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(initialIntervalForRetries);
exponentialBackOffPolicy.setSleeper(new ThreadWaitSleeper());
exponentialBackOffPolicy.setMultiplier(2.0);
exponentialBackOffPolicy.setMaxInterval(maxIntervalForRetries);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(new KafkaEhRetryContextCache(kafkaEhCacheRetryManager));
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
// KafkaTransactionalRetryPolicy extends SimpleRetryPolicy
KafkaTransactionalRetryPolicy retryPolicy = new KafkaTransactionalRetryPolicy(kafkaTemplate);
retryPolicy.setMaxAttempts(maxAttempts);
retryTemplate.setRetryPolicy(kafkaTransactionalRetryPolicy);
return retryTemplate;
}
我的听众看起来像:
public class MyKafkaListener implements MessageListener<String, String> {
@Override
@Transactional(value = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord){
throw new RuntimeException("thrown out of out anger");
}
}
使用此配置:
spring.kafka:
bootstrap-servers: ${service.kakfa.host}
admin:
client-id: test-consumers
bootstrap-servers: ${service.kakfa.host}
consumer:
bootstrap-servers: ${service.kakfa.host}
group-id: local-consumers
client-id: local-consumers
auto-offset-reset: earliest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
isolation-level: read_committed
producer:
bootstrap-servers: ${service.kakfa.host}
client-id: local-producer
acks: all
retries: 3
transaction-id-prefix: local-producer-tx-
properties:
enable.idempotence: true
transactional.id: tran-id-1-
max.in.flight.requests.per.connection: 5
listener.concurrency: 1
我在 Whosebug 上看到了几个关于如何执行此操作的示例,但 none 到目前为止有效。
您尝试使用的重试机制仅适用于 @KafakListener
s。它内置于用于调用侦听器 POJO 的侦听器适配器中。
在较新的版本中,SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
有一个后退(自 2.3 起),消除了在侦听器级别重试模板的需要,有利于在容器级别重试.
对于您自己的侦听器,您必须在侦听器代码本身中使用 RetryTemplate
。
顺便说一句,Spring 5.1.x 不再受支持 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions#supported-versions
我已经实现了一个自定义侦听器,即不使用 @KafkaListener 注释,因为我的应用程序需要动态监听主题。我看到了迁移到 Spring kafka 2.6.x 的建议,但由于我被 Spring 5.[=] 困住了(至少现在),所以我无法升级27=] 这意味着我只能使用 Spring-kafka 2.2.x.
我的问题是,如何使用Spring-kafka 2.2.x实现重试、恢复和错误处理?
ConcurrentKafkaListenerContainerFactory listenerFactory = new ConcurrentKafkaListenerContainerFactory();
listenerFactory.setConsumerFactory(consumerFactory);
configurer.configure(listenerFactory, consumerFactory);
listenerFactory.setConcurrency(listenerConcurrency);
listenerFactory.setStatefulRetry(Boolean.TRUE);
listenerFactory.setBatchListener(isBatchListener);
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
listenerFactory.getContainerProperties().setAckOnError(false);
listenerFactory.setRetryTemplate(retryTemplate(kafkaEhCacheRetryManager));
listenerFactory.setRecoveryCallback(kafkaRecoverer);
我的重试模板如下所示:
RetryTemplate retryTemplate(EhCacheCacheManager kafkaEhCacheRetryManager) {
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(initialIntervalForRetries);
exponentialBackOffPolicy.setSleeper(new ThreadWaitSleeper());
exponentialBackOffPolicy.setMultiplier(2.0);
exponentialBackOffPolicy.setMaxInterval(maxIntervalForRetries);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(new KafkaEhRetryContextCache(kafkaEhCacheRetryManager));
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
// KafkaTransactionalRetryPolicy extends SimpleRetryPolicy
KafkaTransactionalRetryPolicy retryPolicy = new KafkaTransactionalRetryPolicy(kafkaTemplate);
retryPolicy.setMaxAttempts(maxAttempts);
retryTemplate.setRetryPolicy(kafkaTransactionalRetryPolicy);
return retryTemplate;
}
我的听众看起来像:
public class MyKafkaListener implements MessageListener<String, String> {
@Override
@Transactional(value = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord){
throw new RuntimeException("thrown out of out anger");
}
}
使用此配置:
spring.kafka:
bootstrap-servers: ${service.kakfa.host}
admin:
client-id: test-consumers
bootstrap-servers: ${service.kakfa.host}
consumer:
bootstrap-servers: ${service.kakfa.host}
group-id: local-consumers
client-id: local-consumers
auto-offset-reset: earliest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
isolation-level: read_committed
producer:
bootstrap-servers: ${service.kakfa.host}
client-id: local-producer
acks: all
retries: 3
transaction-id-prefix: local-producer-tx-
properties:
enable.idempotence: true
transactional.id: tran-id-1-
max.in.flight.requests.per.connection: 5
listener.concurrency: 1
我在 Whosebug 上看到了几个关于如何执行此操作的示例,但 none 到目前为止有效。
您尝试使用的重试机制仅适用于 @KafakListener
s。它内置于用于调用侦听器 POJO 的侦听器适配器中。
在较新的版本中,SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
有一个后退(自 2.3 起),消除了在侦听器级别重试模板的需要,有利于在容器级别重试.
对于您自己的侦听器,您必须在侦听器代码本身中使用 RetryTemplate
。
顺便说一句,Spring 5.1.x 不再受支持 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions#supported-versions