你有测试来显示 reactor map() 和 flatMap() 之间的区别吗?

Do you have a test to show differences between the reactor map() and flatMap()?

我仍在尝试理解 reactor map() 和 flatMap() 方法之间的区别。 首先我看了一下 API,但它并没有真正的帮助,它让我更加困惑。 然后我在谷歌上搜索了很多,但似乎没有人有一个例子来解释这些差异,如果有任何差异的话。

因此我尝试编写两个测试来查看每种方法的不同行为。 但不幸的是,它没有像我希望的那样工作...

第一个测试方法是测试响应式 flatMap() 方法:

@Test
void fluxFlatMapTest() {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .flatMap(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

输出符合预期,可解释,看起来像这样:

9 - parallel-2
1 - parallel-1
4 - parallel-1
25 - parallel-3
36 - parallel-3
49 - parallel-4
64 - parallel-4
81 - parallel-5
100 - parallel-5
16 - parallel-2

第二种方法应该测试 map() 方法的输出,以与 flatMap() 方法的上述结果进行比较。

@Test
void fluxMapTest() {
    final int start = 1;
    final int stop = 100;
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .map(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

这个测试方法有输出,我完全没想到,看起来像这样:

FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn

有一个看起来像这样的小辅助方法:

private String processNumber(Integer x) {
    String squaredValueAsString = String.valueOf(x * x);
    return squaredValueAsString.concat(" - ").concat(Thread.currentThread().getName());
}

没什么特别的。

我正在使用 Spring Boot 2.3.4 和 Java 11 以及 Spring 的反应器实现。

您是否有很好的解释示例,或者您知道如何更改上述测试以使它们有意义吗? 那么请帮我解决这个问题。 非常感谢!

Reactor 是 Webflux 中的底层库,由一个叫做 event loop 的东西组成,我相信它是基于一个叫做 LMAX Architecture.

的架构的

这意味着 event loop 是单线程事件处理器。事件循环之前的一切都可以是多线程的,但事件本身由单个线程处理。 The event loop.

常规 spring 引导应用程序通常 运行 使用服务器 tomcat 或 undertow,而 webflux 默认 运行 由事件驱动服务器 Netty,它反过来使用这个 event loop 来为我们处理事件。

现在我们了解了一切背后的内容,我们可以开始讨论 mapflatMap

地图

如果我们查看 api,我们可以看到下图:

并且 api 文字说:

Transform the items emitted by this Flux by applying a synchronous function to each item.

这是不言自明的。我们有 Flux 个项目,每次 map 请求处理一个项目时,它不会请求另一个项目,直到它处理完第一个项目。因此 同步.

图片显示,需要将绿色圆圈转为绿色方块,直到我们可以要求将黄色圆圈转为黄色方块...等等等等

这是一个代码示例:

Flux.just("a", "b", "c")
    .map(value -> value.toUppercase())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

每个都在主线程上运行,并在彼此之后同步处理。

平面地图

如果我们查看 api,我们可以看到下图:

文字说:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

它基本上使用三个步骤来完成此操作:

  • 内部和订阅的生成:此运算符正在急切地订阅其内部。
  • 展平值的排序:此运算符不一定保留原始顺序,因为内部元素在到达时已展平。
  • 交错:此运算符允许来自不同内部序列的值交错(类似于合并内部序列)。

那么这是什么意思呢?好吧,它基本上意味着:

  1. 它将获取流中的每个项目,并将其转换为个人Mono(发布者),每个项目中有一个项目。

  2. 在处理项目时对其进行排序,flatMap NOT 保留顺序,因为项目可以在事件循环中以不同的时间处理。

  3. 将所有已处理的项目合并回 Flux 以便进一步处理。

这是一个代码示例:

Flux.just("a", "b", "c")
        .flatMap(value -> Mono.just(value.toUpperCase()))
         .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

等待 flatMap 打印与地图相同的内容!

嗯,这一切又回到了我们之前谈到的线程模型。实际上只有一个称为事件循环的线程处理所有事件。

Reactor 是并发不可知的,这意味着任何 worker 都可以安排由 event loop 处理的作业。

那么什么是 worker 那么 worker 就是 scheduler 可以产生的东西。还有一件重要的事情是,worker 不一定是线程,它可以是,但不一定是。

在上面的代码案例中,主线程订阅了我们的流量,这意味着主线程会为我们处理这个并安排事件循环处理的工作。

在服务器环境中,情况不一定如此。 这里要理解的重要一点是,如果需要,reactor 可以随时切换 workers(又名可能的线程)

在我上面的代码示例中只有一个主线程,因此不需要 运行 多线程上的东西,或者并行执行。

如果我想强制执行它,我可以使用不同的调度程序之一,它们都有各自的用途。在 Netty 中,服务器将启动与您机器上的内核相同数量的事件循环线程,因此它可以在重负载时根据需要自由切换工作程序和内核,以最大限度地利用所有事件循环..

FlatMap 异步并不意味着并行,这意味着它将安排事件循环同时处理所有事情,但它仍然只有一个线程执行任务.

并行执行

如果我真的想并行执行某些事情,您可以将某些事情放在并行调度程序上。这意味着它将保证多个 workers 在多个内核上。但是请记住,当您的程序是 运行 时,这有一个设置时间,这通常只有在您有繁重的计算内容而需要大量单核 CPU 能力时才有用。

代码示例:

Flux.just("a", "b", "c")
    .flatMap(value -> value -> Mono.just(value.toUpperCase()))
    .subscribeOn(Schedulers.parallel())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - parallel-1
B - parallel-1
C - parallel-1

这里我们仍然运行只关注一个线程,因为subscribeOn意味着当一个线程订阅时Scheduler将从调度程序池中选择一个线程然后坚持它在整个执行过程中。

如果我们确实想要在多个线程上强制执行,例如我们可以使用并行通量。

Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

// Output
parallel-3 -> 2
parallel-2 -> 1
parallel-3 -> 4
parallel-2 -> 3
parallel-3 -> 6
parallel-2 -> 5
parallel-3 -> 8
parallel-2 -> 7
parallel-3 -> 10
parallel-2 -> 9

但请记住,在大多数情况下这是没有必要的。有一个设置时间,这种类型的执行通常只有在你有很多 cpu 繁重的任务时才有用。否则,在大多数情况下,使用默认的事件循环单线程“可能”会更快。

处理很多 i/o 任务,通常更多的是关于编排,而不是原始的 CPU 能力。


此处大部分信息取自 FluxMono api.

Reactor documentation 是一个了不起且有趣的信息来源。

还有 Simon Baslé 的博客系列 Flight of the flux is also a wonderful and interesting read. It also exists in Youtube format

这里和那里也有一些错误,我也做了一些假设,尤其是当涉及到 Reactor 的内部工作时。但希望这至少能理清一些想法。

如果有人觉得事情有直接错误,请随时编辑。