在 Flux doOnEach 中同步
Synchronize in Flux doOnEach
我们来看这个例子:
class ReactiveApp {
static volatile int count = 0;
static final Lock lock = new ReentrantLock();
static final CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) throws Exception {
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnEach(i -> {
lock.lock();
count++;
lock.unlock();
})
.doOnComplete(() -> latch.countDown())
.subscribe();
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnEach(i -> {
lock.lock();
count++;
lock.unlock();
})
.doOnComplete(() -> latch.countDown())
.subscribe();
latch.await();
System.out.println(count);
}
}
以上代码以同步方式在每个元素上递增 count
。我希望 System.out.println(count);
打印 200000
。我的代码有什么问题?
由于您实际上并没有在问题中具体说明问题,所以我假设这个数字比您预期的要小得多。
这是因为您在 parallelFlux 上使用了 onComplete。
Run the specified runnable when a 'rail' completes.
请注意,它说的是 'rail' 完成的时间,而不是所有 rails 完成的时间。
这意味着您的闩锁正在倒计时 'rail' 完成。
对于较小的数字,您可能会看到它是总数/数量 cpu 个核心(或 rails 的数量,如果明确设置)
由于多个 rails 运行 的比率略有不同,因此数字越大,它可能会更加随机。
无论如何,一种解决方案是在使用 sequential()
调用 onComplete 之前将 parallelFlux 重新合并在一起,因此只有当所有 rails 都完成时才会调用 runnable。
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnNext(i -> {
lock.lock();
count.incrementAndGet();
lock.unlock();
})
.sequential()
.doOnComplete(latch::countDown)
.subscribe();
还有一些关于上面代码的其他注释
doOnNext
用于防止完成事件也增加计数。
count
我已更改为 AtomicInteger 并调用 incrementAndGet()
,以提供更好的线程安全性
我们来看这个例子:
class ReactiveApp {
static volatile int count = 0;
static final Lock lock = new ReentrantLock();
static final CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) throws Exception {
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnEach(i -> {
lock.lock();
count++;
lock.unlock();
})
.doOnComplete(() -> latch.countDown())
.subscribe();
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnEach(i -> {
lock.lock();
count++;
lock.unlock();
})
.doOnComplete(() -> latch.countDown())
.subscribe();
latch.await();
System.out.println(count);
}
}
以上代码以同步方式在每个元素上递增 count
。我希望 System.out.println(count);
打印 200000
。我的代码有什么问题?
由于您实际上并没有在问题中具体说明问题,所以我假设这个数字比您预期的要小得多。
这是因为您在 parallelFlux 上使用了 onComplete。
Run the specified runnable when a 'rail' completes.
请注意,它说的是 'rail' 完成的时间,而不是所有 rails 完成的时间。
这意味着您的闩锁正在倒计时 'rail' 完成。
对于较小的数字,您可能会看到它是总数/数量 cpu 个核心(或 rails 的数量,如果明确设置)
由于多个 rails 运行 的比率略有不同,因此数字越大,它可能会更加随机。
无论如何,一种解决方案是在使用 sequential()
调用 onComplete 之前将 parallelFlux 重新合并在一起,因此只有当所有 rails 都完成时才会调用 runnable。
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnNext(i -> {
lock.lock();
count.incrementAndGet();
lock.unlock();
})
.sequential()
.doOnComplete(latch::countDown)
.subscribe();
还有一些关于上面代码的其他注释
doOnNext
用于防止完成事件也增加计数。
count
我已更改为 AtomicInteger 并调用 incrementAndGet()
,以提供更好的线程安全性