当我使用 ConcurrentMessageListenerContainer 时,如何跳过 kafka 中有错误的消息?
How to skip a msg that have error in kafka when i use ConcurrentMessageListenerContainer?
我正在使用spring-kafka-2.1。10.RELEASE尝试跳过错误数据但失败了,
陷入无限消耗循环
有什么消息吗?
如能提供帮助,不胜感激
简单代码:
=========================
@Service
public class testConsumerFactoryService {
@Autowired
private PlatformTransactionManager tx;
@Autowired
private TestRepository testRepository; // table only have 1 column , String
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@PostConstruct
public void consumerFactoryTest() {
ContainerProperties containerProps = new ContainerProperties("test_1");
containerProperties.setTransactionManager(this.tx);
containerProperties
.setMessageListener(new ConsumerAwareMessageListener<String, String>() {
@Override
public void onMessage(final ConsumerRecord<String, String> record,
final Consumer<?, ?> consumer) {
final String rec = record.value();
//add the 'rec' string to TestRepository's entity , aka: tests
try {
this.testRepository.save(tests); // >> 1. try to save in DB, but failed
} catch (final Throwable e) {
System.out.println("error !!" + e);
System.out.println("##records:" + record);
consumer.commitSync(); // >> 2. try to commit offset, but seems not working ?
System.out.println("##after:");
}
}
}
);
final ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
this.consumerFactory, containerProperties);
container.start();
}
}
===============================
配置:
===============================
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//use AckMode.RECORD
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, 'Test1');
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
在您的 ConcurrentKafkaListenerContainerFactory 中,只需关联一个 SeekToCurrentErrorHandler:
factory.setErrorHandler(
new SeekToCurrentErrorHandler(
(rec, ex) -> {
log.error(
"An exception happened during a message processing: {}", ex);
// good to put something like a dispatch to a DLQ etc
}));
有了这个,您将继续进行下一个偏移量,并且您的应用程序不会因错误消息等而被阻止。
我正在使用spring-kafka-2.1。10.RELEASE尝试跳过错误数据但失败了,
陷入无限消耗循环
有什么消息吗?
如能提供帮助,不胜感激
简单代码:
=========================
@Service
public class testConsumerFactoryService {
@Autowired
private PlatformTransactionManager tx;
@Autowired
private TestRepository testRepository; // table only have 1 column , String
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@PostConstruct
public void consumerFactoryTest() {
ContainerProperties containerProps = new ContainerProperties("test_1");
containerProperties.setTransactionManager(this.tx);
containerProperties
.setMessageListener(new ConsumerAwareMessageListener<String, String>() {
@Override
public void onMessage(final ConsumerRecord<String, String> record,
final Consumer<?, ?> consumer) {
final String rec = record.value();
//add the 'rec' string to TestRepository's entity , aka: tests
try {
this.testRepository.save(tests); // >> 1. try to save in DB, but failed
} catch (final Throwable e) {
System.out.println("error !!" + e);
System.out.println("##records:" + record);
consumer.commitSync(); // >> 2. try to commit offset, but seems not working ?
System.out.println("##after:");
}
}
}
);
final ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
this.consumerFactory, containerProperties);
container.start();
}
}
=============================== 配置: ===============================
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//use AckMode.RECORD
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, 'Test1');
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
在您的 ConcurrentKafkaListenerContainerFactory 中,只需关联一个 SeekToCurrentErrorHandler:
factory.setErrorHandler(
new SeekToCurrentErrorHandler(
(rec, ex) -> {
log.error(
"An exception happened during a message processing: {}", ex);
// good to put something like a dispatch to a DLQ etc
}));
有了这个,您将继续进行下一个偏移量,并且您的应用程序不会因错误消息等而被阻止。