spring 使用过滤策略和手动提交的 kafka 批处理示例
Examples on spring kafka batch processing with filter strategy and manual commit
我打算使用 spring kafka 批处理监听器进行批处理。我正在为这两种情况寻找一些样本。
- 我们如何使用批处理实现过滤记录策略?更新:来自文档 - “此外,还提供了一个 FilteringBatchMessageListenerAdapter,供您使用批处理消息侦听器时使用。”不清楚。我没有看到任何容器工厂方法来设置此 filterbatchmessagelisteneradapter 对象或过滤器实现。
这是我的批量侦听器过滤策略代码:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {
//log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
return true;
}
});
return factory;
}
- 一旦我们在消费者中检索了一批消息并且所有消息都得到了处理,我们如何才能进行手动偏移量提交。在批处理过程中,如果出现任何故障,只想将该消息推送到错误 topic.But 最后我想一次提交整个批处理。
现在我想到的另一个问题是上述场景如何适用于单个消费者和多个消费者。
假设情况 1:单身消费者
假设我们有一个包含 5 个分区的主题。当我们订阅该主题时,我们假设我们从主题中获得了 100 条消息,其中每个分区有 20 条消息。如果我们要提交这些消息偏移量,确认对象是否保存最后一条消息的每个分区和最后偏移量?
案例2:多个消费者
对于case1中提到的相同输入,如果我们启用分区计数相等的消费者数,ack对象是否保存分区和最后一条消息偏移量?
你能帮我解决这个问题吗?
见FilteringBatchMessageListenerAdapter
https://docs.spring.io/spring-kafka/docs/current/reference/html/#filtering-messages
使用批处理处理异常的最简单方法是使用 RecoveringBatchErrorHandler
和 DeadLetterPublishingRecoverer
。抛出一个BatchListenerFailedException
表示batch中哪条记录失败;成功记录的偏移量被提交,其余记录(包括失败的记录)将被重新传送,直到重试(如果配置)用完,失败的记录将转到死信主题,其余记录将被重新传送。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
是的,当批次被确认时,批次中每个分区的最新偏移量 (+1) 被提交。
如果您有多个消费者,分区将分布在这些消费者中。
我打算使用 spring kafka 批处理监听器进行批处理。我正在为这两种情况寻找一些样本。
- 我们如何使用批处理实现过滤记录策略?更新:来自文档 - “此外,还提供了一个 FilteringBatchMessageListenerAdapter,供您使用批处理消息侦听器时使用。”不清楚。我没有看到任何容器工厂方法来设置此 filterbatchmessagelisteneradapter 对象或过滤器实现。
这是我的批量侦听器过滤策略代码:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {
//log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
return true;
}
});
return factory;
}
- 一旦我们在消费者中检索了一批消息并且所有消息都得到了处理,我们如何才能进行手动偏移量提交。在批处理过程中,如果出现任何故障,只想将该消息推送到错误 topic.But 最后我想一次提交整个批处理。
现在我想到的另一个问题是上述场景如何适用于单个消费者和多个消费者。
假设情况 1:单身消费者
假设我们有一个包含 5 个分区的主题。当我们订阅该主题时,我们假设我们从主题中获得了 100 条消息,其中每个分区有 20 条消息。如果我们要提交这些消息偏移量,确认对象是否保存最后一条消息的每个分区和最后偏移量?
案例2:多个消费者
对于case1中提到的相同输入,如果我们启用分区计数相等的消费者数,ack对象是否保存分区和最后一条消息偏移量?
你能帮我解决这个问题吗?
见
FilteringBatchMessageListenerAdapter
https://docs.spring.io/spring-kafka/docs/current/reference/html/#filtering-messages使用批处理处理异常的最简单方法是使用
RecoveringBatchErrorHandler
和DeadLetterPublishingRecoverer
。抛出一个BatchListenerFailedException
表示batch中哪条记录失败;成功记录的偏移量被提交,其余记录(包括失败的记录)将被重新传送,直到重试(如果配置)用完,失败的记录将转到死信主题,其余记录将被重新传送。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
是的,当批次被确认时,批次中每个分区的最新偏移量 (+1) 被提交。
如果您有多个消费者,分区将分布在这些消费者中。