Project Reactor Kafka:不阻塞地在Flux结束时执行动作
Project Reactor Kafka: Perform action at the end of Flux without blocking
我正在开发一个使用 project-reactor
Kafka API 响应式连接到 Kafka-brokers
的应用程序。用例是有一个输入主题,其中包含要处理的文件的文件路径。应用程序读取每个文件,对其进行处理,创建已处理消息的流量并将其推送到输出主题。要求是文件在处理后必须删除,处理后的消息应推送到输出主题。因此,必须在处理完每个文件并将消息的流量推送到输出主题后执行删除操作。
public Flux<?> flux() {
return KafkaReceiver
.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x,
m.receiverOffset())))
.as(sender::send)
.doOnNext(m -> {
m.correlationMetadata().acknowledge();
deleteFile(path);
}).doOnCancel(() -> close());
}
*transform() 方法启动文件路径中的文件处理(m.value()) 和returns 大量消息。
问题是文件甚至在所有消息被推送到输出主题之前就被删除了。因此,如果失败,重试时原始文件不可用。
由于 path
变量似乎在整个管道(方法输入参数?)中都可以访问,您可以在单独的 doFinally
中删除该文件。您需要筛选 onComplete
或 cancel
SignalType
,因为您不想在出现故障时删除文件。
如果您不想在取消时删除文件,另一个选项是 doOnComplete
。
我正在开发一个使用 project-reactor
Kafka API 响应式连接到 Kafka-brokers
的应用程序。用例是有一个输入主题,其中包含要处理的文件的文件路径。应用程序读取每个文件,对其进行处理,创建已处理消息的流量并将其推送到输出主题。要求是文件在处理后必须删除,处理后的消息应推送到输出主题。因此,必须在处理完每个文件并将消息的流量推送到输出主题后执行删除操作。
public Flux<?> flux() {
return KafkaReceiver
.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x,
m.receiverOffset())))
.as(sender::send)
.doOnNext(m -> {
m.correlationMetadata().acknowledge();
deleteFile(path);
}).doOnCancel(() -> close());
}
*transform() 方法启动文件路径中的文件处理(m.value()) 和returns 大量消息。
问题是文件甚至在所有消息被推送到输出主题之前就被删除了。因此,如果失败,重试时原始文件不可用。
由于 path
变量似乎在整个管道(方法输入参数?)中都可以访问,您可以在单独的 doFinally
中删除该文件。您需要筛选 onComplete
或 cancel
SignalType
,因为您不想在出现故障时删除文件。
如果您不想在取消时删除文件,另一个选项是 doOnComplete
。