Reactor Flux - 仅在完成时从 Publisher 发出
Reactor Flux - Only emit from Publisher on completion
我有一些 Reactor Kafka 代码通过 KafkaReceiver
读取事件并通过 1 个或多个 KafkaSenders
写入 1..许多下游消息,这些消息被串联成一个 Publisher
.一切都很好,但我想做的只是在完成时从这个串联的发件人 Flux
发出一个事件(即,它已完成为任何给定事件写入所有下游主题,因此它不会发出每个元素写入 Kafka 下游直到完成之前的任何内容)。这样我就可以 sample()
并定期提交偏移量,知道每当 sample()
恰好触发并且我为传入事件提交偏移量我已经处理了每个事件的所有下游消息我是承诺抵消。似乎我可以以某种方式使用 pauseUntilOther()
或 then()
,但我不太清楚如何给出我的代码和特定用例。任何想法或建议表示赞赏,谢谢。
主要发布商代码:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
调用 processEvent()
返回的串联 KafkaSenders:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);
听起来 Flux.last()
就是您要查找的内容:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event)
.last();
然后您的 .sample(Duration.ofMillis(10))
将做任何可用的作为发送给这些经纪人的一批或几批中的最后一项。最后,您的 commitReceiverOffset()
会正确地提交最后的任何内容。
有关详细信息,请参阅其 JavaDocs:
/**
* Emit the last element observed before complete signal as a {@link Mono}, or emit
* {@link NoSuchElementException} error if the source was empty.
* For a passive version use {@link #takeLast(int)}
*
* <p>
* <img class="marble" src="doc-files/marbles/last.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards elements before the last.
*
* @return a {@link Mono} with the last value in this {@link Flux}
*/
public final Mono<T> last() {
和大理石图:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg
我有一些 Reactor Kafka 代码通过 KafkaReceiver
读取事件并通过 1 个或多个 KafkaSenders
写入 1..许多下游消息,这些消息被串联成一个 Publisher
.一切都很好,但我想做的只是在完成时从这个串联的发件人 Flux
发出一个事件(即,它已完成为任何给定事件写入所有下游主题,因此它不会发出每个元素写入 Kafka 下游直到完成之前的任何内容)。这样我就可以 sample()
并定期提交偏移量,知道每当 sample()
恰好触发并且我为传入事件提交偏移量我已经处理了每个事件的所有下游消息我是承诺抵消。似乎我可以以某种方式使用 pauseUntilOther()
或 then()
,但我不太清楚如何给出我的代码和特定用例。任何想法或建议表示赞赏,谢谢。
主要发布商代码:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
调用 processEvent()
返回的串联 KafkaSenders:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);
听起来 Flux.last()
就是您要查找的内容:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event)
.last();
然后您的 .sample(Duration.ofMillis(10))
将做任何可用的作为发送给这些经纪人的一批或几批中的最后一项。最后,您的 commitReceiverOffset()
会正确地提交最后的任何内容。
有关详细信息,请参阅其 JavaDocs:
/**
* Emit the last element observed before complete signal as a {@link Mono}, or emit
* {@link NoSuchElementException} error if the source was empty.
* For a passive version use {@link #takeLast(int)}
*
* <p>
* <img class="marble" src="doc-files/marbles/last.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards elements before the last.
*
* @return a {@link Mono} with the last value in this {@link Flux}
*/
public final Mono<T> last() {
和大理石图:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg