spring-kafka 的 Kafka 死信队列 (DLQ)
Dead letter queue (DLQ) for Kafka with spring-kafka
在 Spring Boot 2.0 应用程序中使用 spring-kafka 实现 死信队列 (DLQ) 概念的最佳方法是什么 2.1.x 将某些 bean 的 @KafkaListener 方法处理失败的所有消息发送到某个预定义的 Kafka DLQ 主题而不丢失单个消息?
所以消耗的Kafka记录是:
- 处理成功,
- 处理失败,发送到DLQ主题,
- 处理失败,没有发送到DLQ主题(由于意外问题)所以会被监听者再次消费
我尝试使用 ErrorHandler 的自定义实现创建侦听器容器,发送记录无法使用 KafkaTemplate 处理到 DLQ 主题。使用禁用的 auto-commit 和 RECORD AckMode.
spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${dlqTopic}")
private String dlqTopic;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
log.error("Error, sending to DLQ...");
kafkaTemplate.send(dlqTopic, record.key(), record.value());
}
}
似乎此实现不能保证项目 #3。如果在 DlqErrorHandler 中抛出异常记录将不会被侦听器再次消费。
使用事务性侦听器容器会有帮助吗?
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
有什么方便的方法可以使用 Spring Kafka 实现 DLQ 概念?
更新 2018/03/28
感谢 Gary Russell 的回答,我能够通过如下实现 DlqErrorHandler 实现所需的行为
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
...
@Override
public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
Consumerrecord<?, ? record = records.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key, record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
} catch (Exception e) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
throw new KafkaException("Seek to current after exception", thrownException);
}
}
}
如果消费者轮询 returns 3 条记录 (1, 2, 3) 并且无法处理第二条记录,则这样:
- 1 将被处理
- 2将处理失败并发送到DLQ
- 3 感谢 consumer seek to record.offset() + 1,它会传递给 listener
如果发送到 DLQ 失败,消费者会查找 record.offset() 并且记录将 re-delivered 发送给侦听器(并且发送到 DLQ 可能会被取消)。
更新 2021/04/30
因为 Spring 原生支持 Kafka 2.7.0 non-blocking retries and dead letter topics。
看例子:https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt
重试通常应 non-blocking(在单独的主题中完成)并延迟:
- 不扰乱 real-time 交通;
- 不增加调用次数,本质上是垃圾请求;
- 可观察性(获取重试次数和其他元数据)。
使用 Kafka 实现 non-blocking 重试和 DLT 功能通常需要设置额外的主题并创建和配置相应的监听器。
发生异常时,它会寻找消费者,以便在下一次轮询时重新传送所有未处理的记录。
您可以使用相同的技术(例如子类)写入 DLQ 并在 DLQ 写入失败时查找当前偏移量(和其他未处理的),如果 DLQ 写入成功则仅查找剩余的记录。
编辑
DeadLetterPublishingRecoverer
是在发布此答案几个月后添加的。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
在 Spring Boot 2.0 应用程序中使用 spring-kafka 实现 死信队列 (DLQ) 概念的最佳方法是什么 2.1.x 将某些 bean 的 @KafkaListener 方法处理失败的所有消息发送到某个预定义的 Kafka DLQ 主题而不丢失单个消息?
所以消耗的Kafka记录是:
- 处理成功,
- 处理失败,发送到DLQ主题,
- 处理失败,没有发送到DLQ主题(由于意外问题)所以会被监听者再次消费
我尝试使用 ErrorHandler 的自定义实现创建侦听器容器,发送记录无法使用 KafkaTemplate 处理到 DLQ 主题。使用禁用的 auto-commit 和 RECORD AckMode.
spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${dlqTopic}")
private String dlqTopic;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
log.error("Error, sending to DLQ...");
kafkaTemplate.send(dlqTopic, record.key(), record.value());
}
}
似乎此实现不能保证项目 #3。如果在 DlqErrorHandler 中抛出异常记录将不会被侦听器再次消费。
使用事务性侦听器容器会有帮助吗?
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
有什么方便的方法可以使用 Spring Kafka 实现 DLQ 概念?
更新 2018/03/28
感谢 Gary Russell 的回答,我能够通过如下实现 DlqErrorHandler 实现所需的行为
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
...
@Override
public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
Consumerrecord<?, ? record = records.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key, record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
} catch (Exception e) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
throw new KafkaException("Seek to current after exception", thrownException);
}
}
}
如果消费者轮询 returns 3 条记录 (1, 2, 3) 并且无法处理第二条记录,则这样:
- 1 将被处理
- 2将处理失败并发送到DLQ
- 3 感谢 consumer seek to record.offset() + 1,它会传递给 listener
如果发送到 DLQ 失败,消费者会查找 record.offset() 并且记录将 re-delivered 发送给侦听器(并且发送到 DLQ 可能会被取消)。
更新 2021/04/30
因为 Spring 原生支持 Kafka 2.7.0 non-blocking retries and dead letter topics。
看例子:https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt
重试通常应 non-blocking(在单独的主题中完成)并延迟:
- 不扰乱 real-time 交通;
- 不增加调用次数,本质上是垃圾请求;
- 可观察性(获取重试次数和其他元数据)。
使用 Kafka 实现 non-blocking 重试和 DLT 功能通常需要设置额外的主题并创建和配置相应的监听器。
发生异常时,它会寻找消费者,以便在下一次轮询时重新传送所有未处理的记录。
您可以使用相同的技术(例如子类)写入 DLQ 并在 DLQ 写入失败时查找当前偏移量(和其他未处理的),如果 DLQ 写入成功则仅查找剩余的记录。
编辑
DeadLetterPublishingRecoverer
是在发布此答案几个月后添加的。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters