并行化过滤操作

parallelize the filter operation

我正在尝试并行化 Flux 的过滤操作。但是,从完成操作所花费的时间来看,它似乎并没有并行化。任何对我在这里可能做错了什么的洞察力将不胜感激。谢谢

@Test
    public void testParallelFilteringFlux() {
        long start = Calendar.getInstance().getTimeInMillis();
        log.info("Start time ::{}",Calendar.getInstance().getTimeInMillis());
        Flux<Integer> fluxFromJust = Flux.range(1, 1000000);
        ParallelFlux<Integer> pfilter = fluxFromJust.filter(i -> i == 99999).parallel(4).runOn(Schedulers.parallel());//filter the even numbers only
        Flux<Integer> filter = fluxFromJust.filter(i -> i == 99999);
        filter.subscribe(i->log.info(">>>>>>>>> Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
        pfilter.subscribe(i->log.info(">>>>>>>>> Parallel Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
    }

输出为:

20:37:29.733 [main] INFO test.ReactorTest - Start time ::1614092849730
20:37:30.040 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:37:30.107 [main] INFO test.ReactorTest - >>>>>>>>> Found Integer: 99999, time: 377
20:37:30.190 [parallel-1] INFO test.ReactorTest - >>>>>>>>> Parallel Found Integer: 99999, time: 460

Process finished with exit code 

它是并行完成的,但是有几点可以解释为什么在您的测试中使用并行会更长,我将尝试解释。

首先,您的测试并不准确,因为:

  • 2 个进程(1 个单线程,1 个并行)并行执行。如果你想让你的结果更精确,你应该运行一个接一个
  • 你应该至少执行 2 次测试,因为第一次有几个东西还没有初始化,所以时间考虑 class 加载,调度程序初始化等......我们不应该在比较 2 个解决方案时将它们考虑在内。

但这不是最重要的一点。实际上,并行处理过滤器需要在幕后做更多的工作,以拆分数据并将其分派给不同的线程。因此,因为过滤器中的谓词非常简单(只是一个比较),所以最后在单个线程中一次完成它比以并行方式进行更有效。如果过滤器的处理时间更重要,并行方式将变得更高效,因为这个处理时间将(或多或少)除以并行线程数。

我重写了你的测试来说明这些要点:

  • 我运行测试了2次避免在时序上考虑初始化
  • 我在等待单线程测试完成后再启动并行测试,所以他们不会干扰
  • 最后,我测试了一个非常简单的谓词(如在您的测试中)和一个更长的谓词(我只是休眠以模拟更长的处理过程)。请注意,我减少了较长谓词中的项数以更快地获得结果。

代码如下:

    @Test
    public void testParallelFilteringFlux() throws Exception {
        Predicate<Integer> predicate;
        System.out.println("Test with short process in the predicate");
        final int nb1 = 1000000;
        predicate = i -> i == nb1 - 1;
        runTest(nb1, predicate);
        runTest(nb1, predicate);
        System.out.println("Test with longer process in the predicate");
        final int nb2 = 10000;
        predicate = i -> {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore
            }
            return i == nb2 - 1;
        };
        runTest(nb2, predicate);
        runTest(nb2, predicate);
    }
    
    private void runTest(int nb, Predicate<Integer> predicate) {
        long start = System.currentTimeMillis();
        List<Integer> result = testSingleThread(nb, predicate).collectList().block();
        System.out.println("Found with single thread " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
        
        start = System.currentTimeMillis();
        result = testParallel(nb, predicate).collectList().block();
        System.out.println("Found with parallel " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
    }
    
    private Flux<Integer> testSingleThread(int nb, Predicate<Integer> predicate) {
        Flux<Integer> fluxFromJust = Flux.range(1, nb);
        Flux<Integer> filter = fluxFromJust.filter(predicate);
        return filter;
    }
    
    private Flux<Integer> testParallel(int nb, Predicate<Integer> predicate) {
        Flux<Integer> fluxFromJust = Flux.range(1, nb);
        ParallelFlux<Integer> pfilter = fluxFromJust.parallel(4).runOn(Schedulers.parallel()).filter(predicate);
        return pfilter.sequential();
    }

这是输出:

Test with short process in the predicate
Found with single thread [999999] in 126ms.
Found with parallel [999999] in 326ms.
Found with single thread [999999] in 6ms.
Found with parallel [999999] in 191ms.
Test with longer process in the predicate
Found with single thread [9999] in 17474ms.
Found with parallel [9999] in 4528ms.
Found with single thread [9999] in 17575ms.
Found with parallel [9999] in 4563ms.

可以看到,短的谓词,单线程测试速度更快,但是如果谓词的处理时间更长,时间几乎是除以4。