Kafka Consumer 中更好的错误处理方式

Better way of error handling in Kafka Consumer

我有一个配置了 spring-kafka 的 Springboot 应用程序,我想在其中处理收听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而错过/无法使用任何消息,将重试 2 次,之后应将消息记录到错误文件中。我有两种方法可以遵循:-

第一种方法(将 SeekToCurrentErrorHandler 与 DeadLetterPublishingRecoverer 结合使用):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }

但是为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到一个文件中。

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {

    logger.error(exceptionStackTrace);
}

方法 2(使用自定义 SeekToCurrentErrorHandler):-

@Bean
    public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        
        factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

private static final int MAX_RETRY_ATTEMPTS = 2;

CustomSeekToCurrentErrorHandler() {
    super(MAX_RETRY_ATTEMPTS);
}

@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    try {
        if (!records.isEmpty()) {
            log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
            
            super.handle(exception, records, consumer, container);
        }
    } catch (SerializationException e) {
        log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
    }
}

}

任何人都可以就实现此类功能的标准方法提出建议。在第一种方法中,我们确实看到了创建 .DLT 主题和额外的 @KafkaListener 的开销。在第二种方法中,我们可以直接记录我们的消费者记录异常。

使用第一种方法,不需要使用DeadLetterPublishingRecoverer,您可以使用任何您想要的ConsumerRecordRecoverer;事实上,默认恢复器只是记录失败的消息。

/**
 * Construct an instance with the default recoverer which simply logs the record after
 * the backOff returns STOP for a topic/partition/offset.
 * @param backOff the {@link BackOff}.
 * @since 2.3
 */
public SeekToCurrentErrorHandler(BackOff backOff) {
    this(null, backOff);
}

而且,在 FailedRecordTracker...

if (recoverer == null) {
    this.recoverer = (rec, thr) -> {
        
        ...

        logger.error(thr, "Backoff "
            + (failedRecord == null
                ? "none"
                : failedRecord.getBackOffExecution())
            + " exhausted for " + ListenerUtils.recordToString(rec));
    };
}

在侦听器适配器中添加重试后,向错误处理程序添加了退避(和重试限制),因此它是“更新的”(也是首选)。

此外,如果使用长 BackOff,使用 in-memory 重试可能会导致重新平衡问题。

最后,只有SeekToCurrentErrorHandler可以处理反序列化问题(通过ErrorHandlingDeserializer)。

编辑

ErrorHandlingDeserializerSeekToCurrentErrorHandler 一起使用。反序列化异常被认为是致命的,会立即调用恢复器。

参见 the documentation

下面是一个简单的 Spring 引导应用程序来演示它:

public class So63236346Application {


    private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So63236346Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
    }

    @Bean
    ErrorHandler errorHandler() {
        return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                + ex.getMessage()));
    }

    @KafkaListener(id = "so63236346", topics = "so63236346")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so63236346", "{\"field\":\"value1\"}");
            template.send("so63236346", "junk");
            template.send("so63236346", "{\"field\":\"value2\"}");
        };
    }

}
package com.example.demo;

public class Thing {

    private String field;

    public Thing() {
    }

    public Thing(String field) {
        this.field = field;
    }

    public String getField() {
        return this.field;
    }

    public void setField(String field) {
        this.field = field;
    }

    @Override
    public String toString() {
        return "Thing [field=" + this.field + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing

结果

Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]

期望记录我们可能在容器级别和侦听器级别遇到的任何异常。

不重试,错误处理如下:-

如果我们在 容器级别 遇到任何异常,我们应该能够记录带有错误描述的消息负载并查找该偏移量并跳过它并继续接收下一个偏移量。虽然只针对 DeserializationException 做了,但是其余的异常也需要 seek 和 offsets 需要跳过。

@Component
public class KafkaContainerErrorHandler implements ErrorHandler {

    private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

        // modify below logic according to your topic nomenclature
        String topics = s.substring(0, s.lastIndexOf('-'));
        int offset = Integer.parseInt(s.split("offset ")[1]);
        int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);

        logger.error("...")
        TopicPartition topicPartition = new TopicPartition(topics, partition);
        logger.info("Skipping {} - {} offset {}",  topics, partition, offset);
        consumer.seek(topicPartition, offset + 1);
    }

    @Override
    public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

    }
}


 factory.setErrorHandler(kafkaContainerErrorHandler);

如果我们在@KafkaListener 级别遇到任何异常,那么我将使用我的自定义错误处理程序配置我的侦听器,并使用消息记录异常,如下所示:-

@Bean("customErrorHandler")
    public KafkaListenerErrorHandler listenerErrorHandler() {
        return (m, e) -> {
            logger.error(...);
            return m;
        };
    }