Project Reactor 3 中的 publishOn 与 subscribeOn

publishOn vs subscribeOn in Project Reactor 3

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

虽然,当我同时使用两者时,日志中没有打印任何内容。 但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

是不是publishOn比subscribeOn更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别,什么时候用哪个?

这是我得到的一个小文档:

publishOn applies in the same way as any other operator, in the middle of the subscriber chain. It takes signals from downstream and replays them upstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators will execute (until another publishOn is chained in).

subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission. However, this does not affect the behavior of subsequent calls to publishOn. They still switch the execution context for the part of the chain after them.

publishOn forces the next operator (and possibly subsequent operators after the next one) to run on a different thread. Similarly, subscribeOn forces the previous operator (and possibly operators prior to the previous one) to run on a different thread.

我花了一些时间才理解它,可能是因为 publishOn 通常在 subscribeOn 之前解释,这里有一个希望更简单的外行解释。

subscribeOn 表示 运行 在指定的调度程序工作程序(其他线程)上进行初始源发射,例如 subscribe(), onSubscribe() and request(),对于任何后续操作也是如此,例如 onNext/onError/onComplete, map etc 无论 subscribeOn() 的位置如何,都会发生此行为

如果你在流畅的调用中没有做任何 publishOn 就这样,在这样的线程上一切都会 运行。

但是一旦你在中间调用 publishOn(),那么任何后续的操作员调用都将在提供的调度程序工作程序上 运行 到这样的 publishOn().

这是一个例子

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

结果会是


1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

如您所见,第一个 doOnNext() 和随后的 map() 在名为 subscribeOn_thread 的线程上 运行ning,直到任何 publishOn()被调用,那么任何后续调用都会在提供的调度程序上 运行 到那个 publishOn() 并且这将再次发生在任何后续调用中,直到任何人调用另一个 publishOn()

以下内容摘自优秀博客posthttps://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers

发布时间

这是跳线程时需要的基本运算符。来自其源的传入信号在给定的调度程序上发布,有效地将线程切换到该调度程序的一个工作人员。

这对 onNextonCompleteonError 信号有效。即,从上游源流向下游订阅者的信号。

所以本质上,出现在这个算子下面的每个处理步骤都会在新的 Scheduler 上执行,直到另一个算子再次切换(例如另一个publishOn)。

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

输出

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

订阅

此运算符更改执行订阅方法的位置。并且由于订阅信号向上流动,它直接影响源 Flux 订阅和开始生成数据的位置。

因此,它似乎可以向上和向下作用于运算符反应链的部分(只要混合中没有 publishOn):

final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
           .map(url -> blockingWebClient.get(url));
}

// sample code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

输出

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C