在 Flux doOnEach 中同步

Synchronize in Flux doOnEach

我们来看这个例子:

class ReactiveApp {

    static volatile int count = 0;
    static final Lock lock = new ReentrantLock();
    static final CountDownLatch latch = new CountDownLatch(2);

    public static void main(String[] args) throws Exception {

        Flux.range(0, 100_000)
                .parallel()
                .runOn(Schedulers.single())
                .doOnEach(i -> {
                    lock.lock();
                    count++;
                    lock.unlock();
                })
                .doOnComplete(() -> latch.countDown())
                .subscribe();

        Flux.range(0, 100_000)
                .parallel()
                .runOn(Schedulers.single())
                .doOnEach(i -> {
                    lock.lock();
                    count++;
                    lock.unlock();
                })
                .doOnComplete(() -> latch.countDown())
                .subscribe();

        latch.await();

        System.out.println(count);

    }
}

以上代码以同步方式在每个元素上递增 count。我希望 System.out.println(count); 打印 200000。我的代码有什么问题?

由于您实际上并没有在问题中具体说明问题,所以我假设这个数字比您预期的要小得多。

这是因为您在 parallelFlux 上使用了 onComplete。

Run the specified runnable when a 'rail' completes.

请注意,它说的是 'rail' 完成的时间,而不是所有 rails 完成的时间。

这意味着您的闩锁正在倒计时 'rail' 完成。

对于较小的数字,您可能会看到它是总数/数量 cpu 个核心(或 rails 的数量,如果明确设置)

由于多个 rails 运行 的比率略有不同,因此数字越大,它可能会更加随机。

无论如何,一种解决方案是在使用 sequential() 调用 onComplete 之前将 parallelFlux 重新合并在一起,因此只有当所有 rails 都完成时才会调用 runnable。

    Flux.range(0, 100_000)
            .parallel()
            .runOn(Schedulers.single())
            .doOnNext(i -> {
                lock.lock();
                count.incrementAndGet();
                lock.unlock();
            })
            .sequential()
            .doOnComplete(latch::countDown)
            .subscribe();

还有一些关于上面代码的其他注释

doOnNext 用于防止完成事件也增加计数。
count 我已更改为 AtomicInteger 并调用 incrementAndGet(),以提供更好的线程安全性