如何从kafka主题中一条一条地消费消息
How to consume messages one by one from a kafka topic
我用一个分区构建了一个kafka主题。
kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1
本主题可以推送多条消息。
但我希望单个消费者(对于给定组)一条一条地处理这些消息。
spring:
application:
name: file-consumer
cloud:
stream:
kafka:
binder:
type: kafka
brokers: localhost
defaultBrokerPort: 29092
defaultZkPort: 32181
configuration:
max.request.size: 300000
max.message.bytes: 300000
bindings:
fileWriteBindingInput:
consumer:
autoCommitOffset: false
bindings:
fileWriteBindingInput:
binder: kafka
destination: files.write
group: ${spring.application.name}
contentType: 'text/plain'
和Java示例代码
@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
// I Would like here to synchronize the processing of messages one by one
// But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message
acknowledgment.acknowledge();
}
我的配置中缺少什么?
我想,当消息未被确认(偏移量未增加)时,none其他消息从同一分区被消耗。
如果启用autoCommitOffset
(这是默认设置),那么活页夹将已经确认每条记录。因此,当它到达您的 StreamListener
时,记录已经被确认。
更正:上述关于StreamListener
的说法并不完全正确。当侦听器退出时自动确认完成。
由于您只有一个分区,因此您将按照发送到该主题分区的相同顺序获取消息。
您可以禁用 autoCommitOffset
,在这种情况下,您可以使用手动确认。
您可以将此消费者配置 max.poll.records
设置为 1 默认为 500
max.poll.records
The maximum number of records returned in a single call to poll().
不确认消息与停止发送下一条消息无关。
您不能将消息传递给另一个线程并稍后再确认;如果你想要单线程处理,你必须在监听线程上完成所有处理。
我用一个分区构建了一个kafka主题。
kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1
本主题可以推送多条消息。
但我希望单个消费者(对于给定组)一条一条地处理这些消息。
spring:
application:
name: file-consumer
cloud:
stream:
kafka:
binder:
type: kafka
brokers: localhost
defaultBrokerPort: 29092
defaultZkPort: 32181
configuration:
max.request.size: 300000
max.message.bytes: 300000
bindings:
fileWriteBindingInput:
consumer:
autoCommitOffset: false
bindings:
fileWriteBindingInput:
binder: kafka
destination: files.write
group: ${spring.application.name}
contentType: 'text/plain'
和Java示例代码
@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
// I Would like here to synchronize the processing of messages one by one
// But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message
acknowledgment.acknowledge();
}
我的配置中缺少什么?
我想,当消息未被确认(偏移量未增加)时,none其他消息从同一分区被消耗。
如果启用autoCommitOffset
(这是默认设置),那么活页夹将已经确认每条记录。因此,当它到达您的 StreamListener
时,记录已经被确认。
更正:上述关于StreamListener
的说法并不完全正确。当侦听器退出时自动确认完成。
由于您只有一个分区,因此您将按照发送到该主题分区的相同顺序获取消息。
您可以禁用 autoCommitOffset
,在这种情况下,您可以使用手动确认。
您可以将此消费者配置 max.poll.records
设置为 1 默认为 500
max.poll.records
The maximum number of records returned in a single call to poll().
不确认消息与停止发送下一条消息无关。
您不能将消息传递给另一个线程并稍后再确认;如果你想要单线程处理,你必须在监听线程上完成所有处理。