Spring Kafka 重试日志记录

Spring Kafka Retry Logging

我需要从 kafka 主题中消费,在记录上做一些工作并使用 spring-kafka 2.1 生成另一个主题。7.Other requiremenrs 是事务性的,仅一次语义,重试和错误 handling.On 提交记录失败 我应该重试 3 次,记录每条重试消息以重试主题 anf 在所有重试失败时将记录发送到死信主题。我查看了 https://github.com/spring-projects/spring-kafka/issues/575,它提供了解决问题的详细信息。我正在努力解决的问题是如何记录每个重试消息,其中包含消费者偏移量、它试图提交的主题等详细信息。有没有办法从重试回调中获取这些消息?下面的 retrylistener 片段注册了一个 org.springframework.kafka.listener.LoggingErrorHandler,它被设置为容器 属性 到 ConcurrentKafkaListenerContainerFactory ?

         @Bean
         public RetryListener retryListener(KafkaTemplate<String,SpecificRecord> kafkaTemplate) {
             return new RetryListenerSupport() {

                public void onError(RetryContext context, RetryCallback callback, Throwable throwable) {
                    int retryCount =context.getRetryCount();
                    kafkaTemplate) .send(new ProducerRecord<String,SpecificRecord>("topic_name",record));
                }
             };
         }

RetryContextRetryingMessageListenerAdapter 中填充了一些有用的信息:

context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
    case ACKNOWLEDGING_CONSUMER_AWARE:
        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
        context.setAttribute(CONTEXT_CONSUMER, consumer);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
        break;
    case ACKNOWLEDGING:
        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
        break;
    case CONSUMER_AWARE:
        context.setAttribute(CONTEXT_CONSUMER, consumer);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
        break;
    case SIMPLE:
        RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}