Spring Kafka Error Handler如何避免死循环
Spring Kafka Error Handler how to avoid endless loop
我希望有人能给我一个提示,希望我在这里做错了。我为批处理侦听器编写了一个自定义错误处理程序,它应该在收到的记录后面寻找,将它们发送到 dlq。我累了很多,但不让它工作。我当前的实现将陷入无限循环,一遍又一遍地接收记录。这里是错误处理程序代码:
@Service("consumerAwareListenerErrorHandlerImpl")
public class ConsumerAwareListenerErrorHandlerImpl implements ConsumerAwareListenerErrorHandler {
private final Executor executor;
private final KafkaListenerEndpointRegistry registry;
private final TaskScheduler scheduler;
@Autowired
public ConsumerAwareListenerErrorHandlerImpl(KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {
this.scheduler = scheduler;
this.executor = new SimpleAsyncTaskExecutor();
this.registry = registry;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
MessageHeaders headers = message.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> (v == null) ? offsets.get(index) : Math.max(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> consumer.seek(k, v));
if (!(exception.getCause() instanceof DeserializationException)) {
//pauseAndRestartContainer();
}
acknowledgment.acknowledge();
consumer.commitSync();
return null;
}
你必须寻求偏移+1才能得到"past"。寻求 offset
将导致重播。
我希望有人能给我一个提示,希望我在这里做错了。我为批处理侦听器编写了一个自定义错误处理程序,它应该在收到的记录后面寻找,将它们发送到 dlq。我累了很多,但不让它工作。我当前的实现将陷入无限循环,一遍又一遍地接收记录。这里是错误处理程序代码:
@Service("consumerAwareListenerErrorHandlerImpl")
public class ConsumerAwareListenerErrorHandlerImpl implements ConsumerAwareListenerErrorHandler {
private final Executor executor;
private final KafkaListenerEndpointRegistry registry;
private final TaskScheduler scheduler;
@Autowired
public ConsumerAwareListenerErrorHandlerImpl(KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {
this.scheduler = scheduler;
this.executor = new SimpleAsyncTaskExecutor();
this.registry = registry;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
MessageHeaders headers = message.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> (v == null) ? offsets.get(index) : Math.max(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> consumer.seek(k, v));
if (!(exception.getCause() instanceof DeserializationException)) {
//pauseAndRestartContainer();
}
acknowledgment.acknowledge();
consumer.commitSync();
return null;
}
你必须寻求偏移+1才能得到"past"。寻求 offset
将导致重播。