如果设置spring.kafka.listener.ack-mode=time,会重试吗?或者在指定的确认模式下重试工作?
if set spring.kafka.listener.ack-mode=time, will retry work? or retry work in specified ack-mode?
spring-kafka SeekToCurrentErrorHandler
如果设置 spring.kafka.listener.ack-mode=time
,会重试吗?或者在指定的确认模式下重试工作,例如 MANUAL MANUAL_IMMEDIATE.
@Value("${${sync.kafka.header.source.id}}")
private String headerSourceId;
@Value("${sync.kafka.from.id}")
private String syncKafkaFromId;
@Value("${sync.kafka.to.id}")
private String syncKafkaToId;
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private ConsumerFactory consumerFactory;
@KafkaListener(topics = "${sync.kafka.topics}")
public void listen(ConsumerRecord<?, ?> record,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String messageKey,
@Header(value = "${sync.kafka.header.source.id}", required = false) String customHeader,
@Headers MessageHeaders messageHeaders) {
log.info("- - - - - - - - - - - - - - -");
log.info("topic: {}", topic);
log.info("message key: {}", messageKey);
log.info("custom header: {}", customHeader);
log.info("messageHeaders: {}", messageHeaders);
ProducerRecord<Object, Object> ProducerRecord = new ProducerRecord(topic,record.key(), record.value());
ProducerRecord.headers().add(new RecordHeader(headerSourceId, syncKafkaFromId.getBytes()));
kafkaTemplate.send(ProducerRecord);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
// or factory.setRetryTemplate(aRetryTemplate);
// and factory.setRecoveryCallback(aRecoveryCallback);
return factory;
}
在使用 SeekToCurrentErrorHandler
时,您应该只使用手动、记录或批处理确认模式。您还应该将 ackOnError
容器 属性 设置为 false
并将 enable.auto.commit
设置为 false
.
spring-kafka SeekToCurrentErrorHandler
如果设置 spring.kafka.listener.ack-mode=time
,会重试吗?或者在指定的确认模式下重试工作,例如 MANUAL MANUAL_IMMEDIATE.
@Value("${${sync.kafka.header.source.id}}")
private String headerSourceId;
@Value("${sync.kafka.from.id}")
private String syncKafkaFromId;
@Value("${sync.kafka.to.id}")
private String syncKafkaToId;
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private ConsumerFactory consumerFactory;
@KafkaListener(topics = "${sync.kafka.topics}")
public void listen(ConsumerRecord<?, ?> record,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String messageKey,
@Header(value = "${sync.kafka.header.source.id}", required = false) String customHeader,
@Headers MessageHeaders messageHeaders) {
log.info("- - - - - - - - - - - - - - -");
log.info("topic: {}", topic);
log.info("message key: {}", messageKey);
log.info("custom header: {}", customHeader);
log.info("messageHeaders: {}", messageHeaders);
ProducerRecord<Object, Object> ProducerRecord = new ProducerRecord(topic,record.key(), record.value());
ProducerRecord.headers().add(new RecordHeader(headerSourceId, syncKafkaFromId.getBytes()));
kafkaTemplate.send(ProducerRecord);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
// or factory.setRetryTemplate(aRetryTemplate);
// and factory.setRecoveryCallback(aRecoveryCallback);
return factory;
}
在使用 SeekToCurrentErrorHandler
时,您应该只使用手动、记录或批处理确认模式。您还应该将 ackOnError
容器 属性 设置为 false
并将 enable.auto.commit
设置为 false
.