Spring Cloud Stream 功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT 不可用
Spring Cloud Stream Functional Manual Acknowledgement - KafkaHeaders.ACKNOWLEDGMENT not available
我正在使用 Spring Cloud Webflux 和 Spring Cloud Streams 功能接口来处理我的 kafka 处理。
如果我按顺序进行处理并且终止应用程序,它会返回要处理的消息,该消息按预期工作,因为没有消息丢失。但是,如果我尝试并行执行,它似乎正在确认 Kafka,这是可以理解的,因为它现在是一个单独的线程,因此想转向手动确认。
我的代码:
- application.yml(相关部分)
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
minPartitionCount: 2
bindings:
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
consumer:
autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
- 收件人代码
public Consumer<Flux<Message<String>>> receiver() throws IOException {
return (sink -> {
sink
.onBackpressureBuffer()
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe((record)->{
Flux<Action> executor = new
//Internal code which does transformation and provides a flux for execution (names changed)
IncomingMessage().process(record);
if(executor != null) {
Disposable disposable=null;
disposable= executor.subscribe(
(action)->{
try {
//Process execute does the processing on the modified data (names changed)
Process.execute(action);
Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment !=null) {
acknowledgment.acknowledge();
}
}
catch(Exception e) {
log.fatal(e.getMsg());
}
},
(e)->{
log.fatal(e.getMsg());
}
});
if(disposable != null) {
disposable.dispose();
}
}
});
});
}
这里 record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
行总是给出 null,所以我假设 autoCommitOffset: false
不起作用,我尝试将以下配置也放在绑定部分但无济于事。
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
autoCommitOffset: false
我的要求是,如果我终止应用程序,即使在并行情况下,它也应该继续从第一条未确认的消息中读取消息。
没有得到确认的问题 header 是因为标签的位置不正确。因为它纯粹是一个 kafka 活页夹 属性 。添加了以下 属性
spring:
cloud:
stream:
kafka:
bindings:
receiver-in-0:
consumer:
autoCommitOffset: false
频道名称应与函数调用中的频道名称相同。
或者默认值,可以设置。
spring:
cloud:
stream:
kafka:
default:
consumer:
autoCommitOffset: false
然而,并行处理传入的流量可能不是一个好主意,因为消息可能会被丢弃,因为一些稍后的消息可以得到确认。它需要更多的确认逻辑,而不仅仅是设置参数确认。
我正在使用 Spring Cloud Webflux 和 Spring Cloud Streams 功能接口来处理我的 kafka 处理。
如果我按顺序进行处理并且终止应用程序,它会返回要处理的消息,该消息按预期工作,因为没有消息丢失。但是,如果我尝试并行执行,它似乎正在确认 Kafka,这是可以理解的,因为它现在是一个单独的线程,因此想转向手动确认。
我的代码:
- application.yml(相关部分)
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
minPartitionCount: 2
bindings:
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
consumer:
autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
- 收件人代码
public Consumer<Flux<Message<String>>> receiver() throws IOException {
return (sink -> {
sink
.onBackpressureBuffer()
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe((record)->{
Flux<Action> executor = new
//Internal code which does transformation and provides a flux for execution (names changed)
IncomingMessage().process(record);
if(executor != null) {
Disposable disposable=null;
disposable= executor.subscribe(
(action)->{
try {
//Process execute does the processing on the modified data (names changed)
Process.execute(action);
Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment !=null) {
acknowledgment.acknowledge();
}
}
catch(Exception e) {
log.fatal(e.getMsg());
}
},
(e)->{
log.fatal(e.getMsg());
}
});
if(disposable != null) {
disposable.dispose();
}
}
});
});
}
这里 record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
行总是给出 null,所以我假设 autoCommitOffset: false
不起作用,我尝试将以下配置也放在绑定部分但无济于事。
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
autoCommitOffset: false
我的要求是,如果我终止应用程序,即使在并行情况下,它也应该继续从第一条未确认的消息中读取消息。
没有得到确认的问题 header 是因为标签的位置不正确。因为它纯粹是一个 kafka 活页夹 属性 。添加了以下 属性
spring:
cloud:
stream:
kafka:
bindings:
receiver-in-0:
consumer:
autoCommitOffset: false
频道名称应与函数调用中的频道名称相同。 或者默认值,可以设置。
spring:
cloud:
stream:
kafka:
default:
consumer:
autoCommitOffset: false
然而,并行处理传入的流量可能不是一个好主意,因为消息可能会被丢弃,因为一些稍后的消息可以得到确认。它需要更多的确认逻辑,而不仅仅是设置参数确认。