Kafka Event 在发生异常时收到 10 次
Kafka Event received 10 times when exceptions occur
我们的应用程序使用 Spring Kafka,我对消费者有疑问。
后端“制作人”
public void sendEvent(final DomainObject obj) {
this.kafkaTemplate.send("topicA", obj);
}
后端“消费者”
@KafkaListener(
topics = {"topicA"}
)
public void onSendEvent(final DomainObject obj) {
this.customService.doSomething(obj)
}
如果没有抛出异常,消费者会收到事件 1 次,但是当“doSomething”方法抛出异常(RuntimeException)时,我的消费者会收到事件 10 次,我不明白为什么。
我尝试在我的消费者中设置一个带有 retryPolicy 的重试模板,但没有任何变化:
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy getSimpleRetryPolicy() {
return new SimpleRetryPolicy(1);
}
这是我的消费者配置:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, DomainEvent> consumerFactory(@Autowired KafkaProperties kafkaProperties,
@Autowired KafkaConsumerCustomization consumerCustomization) {
final JsonDeserializer<DomainEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.getTypeMapper().setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
jsonDeserializer.configure(Map.of(JsonDeserializer.TYPE_MAPPINGS, consumerCustomization.getFlatIdClassMapping()), false);
jsonDeserializer.addTrustedPackages(consumerCustomization.getTrustedPackages());
kafkaProperties.getProperties().put(ConsumerConfig.GROUP_ID_CONFIG, consumerCustomization.getConsumerId());
kafkaProperties.getProperties().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> kafkaListenerContainerFactory(
@Autowired ConsumerFactory<String, DomainEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, DomainEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.setConsumerFactory(consumerFactory);
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy getSimpleRetryPolicy() {
return new SimpleRetryPolicy(1);
}
public static class KafkaConsumerCustomization {
private final String[] trustedPackages;
private final Map<String, Class<?>> idClassMapping;
private final String consumerId;
public KafkaConsumerCustomization(String[] trustedPackages, final Map<String, Class<?>> idClassMapping, String consumerId) {
this.trustedPackages = trustedPackages;
this.idClassMapping = idClassMapping;
this.consumerId = consumerId;
}
public String[] getTrustedPackages() {
return trustedPackages;
}
public String getFlatIdClassMapping() {
return idClassMapping.entrySet().stream()
.map(stringClassEntry -> stringClassEntry.getKey() + ":" + stringClassEntry.getValue().getCanonicalName())
.collect(Collectors.joining(","));
}
public Map<String, Class<?>> getIdClassMapping() {
return idClassMapping;
}
public String getConsumerId() {
return consumerId;
}
}
}
和我的生产者配置:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, DomainEvent> kafkaTemplate(@Autowired final KafkaProperties kafkaProperties) {
return new KafkaTemplate<>(this.producerFactory((kafkaProperties)));
}
@Bean
public ProducerFactory<String, DomainEvent> producerFactory(final KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>());
}
}
感谢您的帮助
根据文档,我建议使用 SeekToCurrentErrorHandler 重试或跳过失败的记录
Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (BiConsumer) and/or max failures.
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures - e.g. send to a dead-letter topic
}, 3);
对于批量监听器
The SeekToCurrentBatchErrorHandler seeks each partition to the first record in each partition in the batch so the whole batch is replayed. This error handler does not support recovery because the framework cannot know which message in the batch is failing.
我们的应用程序使用 Spring Kafka,我对消费者有疑问。
后端“制作人”
public void sendEvent(final DomainObject obj) {
this.kafkaTemplate.send("topicA", obj);
}
后端“消费者”
@KafkaListener(
topics = {"topicA"}
)
public void onSendEvent(final DomainObject obj) {
this.customService.doSomething(obj)
}
如果没有抛出异常,消费者会收到事件 1 次,但是当“doSomething”方法抛出异常(RuntimeException)时,我的消费者会收到事件 10 次,我不明白为什么。
我尝试在我的消费者中设置一个带有 retryPolicy 的重试模板,但没有任何变化:
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy getSimpleRetryPolicy() {
return new SimpleRetryPolicy(1);
}
这是我的消费者配置:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, DomainEvent> consumerFactory(@Autowired KafkaProperties kafkaProperties,
@Autowired KafkaConsumerCustomization consumerCustomization) {
final JsonDeserializer<DomainEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.getTypeMapper().setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
jsonDeserializer.configure(Map.of(JsonDeserializer.TYPE_MAPPINGS, consumerCustomization.getFlatIdClassMapping()), false);
jsonDeserializer.addTrustedPackages(consumerCustomization.getTrustedPackages());
kafkaProperties.getProperties().put(ConsumerConfig.GROUP_ID_CONFIG, consumerCustomization.getConsumerId());
kafkaProperties.getProperties().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> kafkaListenerContainerFactory(
@Autowired ConsumerFactory<String, DomainEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, DomainEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.setConsumerFactory(consumerFactory);
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy getSimpleRetryPolicy() {
return new SimpleRetryPolicy(1);
}
public static class KafkaConsumerCustomization {
private final String[] trustedPackages;
private final Map<String, Class<?>> idClassMapping;
private final String consumerId;
public KafkaConsumerCustomization(String[] trustedPackages, final Map<String, Class<?>> idClassMapping, String consumerId) {
this.trustedPackages = trustedPackages;
this.idClassMapping = idClassMapping;
this.consumerId = consumerId;
}
public String[] getTrustedPackages() {
return trustedPackages;
}
public String getFlatIdClassMapping() {
return idClassMapping.entrySet().stream()
.map(stringClassEntry -> stringClassEntry.getKey() + ":" + stringClassEntry.getValue().getCanonicalName())
.collect(Collectors.joining(","));
}
public Map<String, Class<?>> getIdClassMapping() {
return idClassMapping;
}
public String getConsumerId() {
return consumerId;
}
}
}
和我的生产者配置:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, DomainEvent> kafkaTemplate(@Autowired final KafkaProperties kafkaProperties) {
return new KafkaTemplate<>(this.producerFactory((kafkaProperties)));
}
@Bean
public ProducerFactory<String, DomainEvent> producerFactory(final KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>());
}
}
感谢您的帮助
根据文档,我建议使用 SeekToCurrentErrorHandler 重试或跳过失败的记录
Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (BiConsumer) and/or max failures.
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures - e.g. send to a dead-letter topic
}, 3);
对于批量监听器
The SeekToCurrentBatchErrorHandler seeks each partition to the first record in each partition in the batch so the whole batch is replayed. This error handler does not support recovery because the framework cannot know which message in the batch is failing.