Kafka - 未调用重试和恢复

Kafka - Retries and Recovery not invoked

我已经实现了一个自定义侦听器,即不使用 @KafkaListener 注释,因为我的应用程序需要动态监听主题。我看到了迁移到 Spring kafka 2.6.x 的建议,但由于我被 Spring 5.[=] 困住了(至少现在),所以我无法升级27=] 这意味着我只能使用 Spring-kafka 2.2.x.

我的问题是,如何使用Spring-kafka 2.2.x实现重试、恢复和错误处理?

ConcurrentKafkaListenerContainerFactory listenerFactory = new ConcurrentKafkaListenerContainerFactory();
listenerFactory.setConsumerFactory(consumerFactory);
configurer.configure(listenerFactory, consumerFactory);
listenerFactory.setConcurrency(listenerConcurrency); 
listenerFactory.setStatefulRetry(Boolean.TRUE);
listenerFactory.setBatchListener(isBatchListener);
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
listenerFactory.getContainerProperties().setAckOnError(false);
listenerFactory.setRetryTemplate(retryTemplate(kafkaEhCacheRetryManager)); 
listenerFactory.setRecoveryCallback(kafkaRecoverer);

我的重试模板如下所示:

RetryTemplate retryTemplate(EhCacheCacheManager kafkaEhCacheRetryManager) {

ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(initialIntervalForRetries);
exponentialBackOffPolicy.setSleeper(new ThreadWaitSleeper());
exponentialBackOffPolicy.setMultiplier(2.0);
exponentialBackOffPolicy.setMaxInterval(maxIntervalForRetries);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(new KafkaEhRetryContextCache(kafkaEhCacheRetryManager));
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

// KafkaTransactionalRetryPolicy extends SimpleRetryPolicy
KafkaTransactionalRetryPolicy retryPolicy = new KafkaTransactionalRetryPolicy(kafkaTemplate);
retryPolicy.setMaxAttempts(maxAttempts);
retryTemplate.setRetryPolicy(kafkaTransactionalRetryPolicy);

return retryTemplate;

}

我的听众看起来像:

public class MyKafkaListener implements MessageListener<String, String> {


    @Override
    @Transactional(value = "chainedKafkaTransactionManager")
    public void onMessage(final ConsumerRecord<String, String> consumerRecord){
       throw new RuntimeException("thrown out of out anger");
    }

}

使用此配置:

spring.kafka:
  bootstrap-servers: ${service.kakfa.host}
  admin:
    client-id: test-consumers
    bootstrap-servers: ${service.kakfa.host}
  consumer:
    bootstrap-servers: ${service.kakfa.host}
    group-id: local-consumers
    client-id: local-consumers
    auto-offset-reset: earliest
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    enable-auto-commit: false
    isolation-level: read_committed
  producer:
    bootstrap-servers: ${service.kakfa.host}
    client-id: local-producer
    acks: all
    retries: 3
    transaction-id-prefix: local-producer-tx-
    properties:
      enable.idempotence: true
      transactional.id: tran-id-1-
      max.in.flight.requests.per.connection: 5
  listener.concurrency: 1

我在 Whosebug 上看到了几个关于如何执行此操作的示例,但 none 到目前为止有效。

您尝试使用的重试机制仅适用于 @KafakListeners。它内置于用于调用侦听器 POJO 的侦听器适配器中。

在较新的版本中,SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor 有一个后退(自 2.3 起),消除了在侦听器级别重试模板的需要,有利于在容器级别重试.

对于您自己的侦听器,您必须在侦听器代码本身中使用 RetryTemplate

顺便说一句,Spring 5.1.x 不再受支持 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions#supported-versions