在 ParallelFlux 完成时执行任务而不执行顺序(引入副作用)

Execute a task on completion of ParallelFlux without executing sequential (introduce a side affect)

我有一个ParallelFlux,想在所有rails中的所有组件都被消耗时执行副作用操作。我试图使用 .then().

但是看不懂怎么用

任何人都可以分享它的用法或在所有元素都经过 OnError、OnComplete rails 后执行副作用的方法。

指示代码:

RunTransformation provides a Parallel Flux in transformation,

OnCompletion mark record as completed in a separate registry.

RunAction does some action for each transformed record (independent of the other).

RunError handles error.

这里我想 运行 只在最终完成时运行完成,但必须顺序执行,尽管消费者可以并行完成。

   Mono.just(record)
       .flatMap(RunTransformation::tranformParallel)   //gives back ParallelFlux running on Schedulers.random()
       .sequential()
       .doOnTerminate(OnCompletion::markRecordProcessed)
       .subscribe(
            RunAction::execute, 
            RunError::handleError);

按如下方式使用 .then()。

Mono.just(record)
        .flatMap(RunTransformation::tranformParallel)   //gives back ParallelFlux running on Schedulers.random()
        .doOnNext(RunAction::execute)
        .doOnError(RunError::handleError)
        .then()
        .doOnTerminate(() -> {System.out.println("all rails completed");})
        .subscribe(); 

摘自文档

If, once you process your sequence in parallel, you want to revert back to a “normal” Flux and apply the rest of the operator chain in a sequential manner, you can use the sequential() method on ParallelFlux.

我认为 doOnComplete 是您要找的。

Flux.range(1, 10)
        .parallel(3)
        .runOn(Schedulers.parallel())
        .doOnNext(i -> System.out.println(Thread.currentThread().getName() + " -> " + i))
        .sequential()
        .doOnComplete(() -> System.out.println("All parallel work is done"))
        .subscribe()

这会产生输出:

parallel-1 -> 1
parallel-2 -> 2
parallel-3 -> 3
parallel-2 -> 5
parallel-3 -> 6
parallel-1 -> 4
parallel-1 -> 7
parallel-2 -> 8
parallel-3 -> 9
parallel-1 -> 10
All parallel work is done

Reactor documentation on parallel flux