Mono.fromCallable 线程行为

Mono.fromCallable Threading Behaviour

我想了解 Reactor 中阻塞函数的行为,但其他一些事情让我完全无法学习。这是代码:

  public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 100_000)
        .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
        .flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
        .subscribe();
    System.out.println("Here");
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static int blockingMethod(int s) {
    try {
      Thread.sleep(100_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return s;
  }

以下是 AFAIK 所发生情况的摘要:

  1. 订阅发生在 main 线程上。

  2. mainflatMap 中变得自由,以从上游带来下一个元素。因此,doOnNext 应该总是打印 main.

  3. 处理完 100_000 个元素后,main 将变为空闲并打印 here.

相反,情况是这样的:

  1. 前 256 个元素按预期打印在 main(在 doOnNext 中)。

  2. 大约 1 秒后,下一个 256,然后下一个,依此类推。从第二批开始的元素打印在 elastic 个线程上。

这是我的问题:

  1. 为什么要分批处理 256 个元素? Schedulers.elastic() 应该按需创建线程,因此理想情况下应该始终有一个线程可用于从 main 获取请求(忽略 JVM 对我可以创建的线程数的限制)。

  2. 为什么第二个 'batch'(及以后)中的元素被打印在 elastic 个线程上?我希望它们在 main 上发布。事实上,当您删除阻塞调用时会发生这种情况

       public static void main(String[] args) throws InterruptedException {
           Flux.range(1, 100_000)
               .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
               .flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
               .subscribe();
          System.out.println("Here");
          Thread.sleep(Integer.MAX_VALUE);
      }
    

此处,所有元素在 doOnNext 中打印 main 并且 here 仅在流完成时打印(释放主线程)。

我是不是漏掉了什么?

好的。让我们通过添加新语句来修改您的示例:

public static void main(String[] args) throws InterruptedException {
       Flux.range(1, 100_000)
           .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName())) // #4
           .flatMap(a -> Mono.just(a)
               .subscribeOn(Schedulers.elastic())) // #2
           .subscribeOn(Schedulers.immediate()) // NEW, #1
           .subscribe(); // #3
      System.out.println("Here");
      Thread.sleep(Integer.MAX_VALUE);
  }

默认情况下,此订阅链将在当前线程中执行。医生说 Mono::subscribeOn(Scheduler):

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

和反应堆参考:

subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission.

有了这些知识,我们可以读出语句:

  1. 在第 3 行我们订阅了 Flux;
  2. 在第 1 行中,我们成功地更改了数字发射的上下文;
  3. flatMap一致,我们没有任何可见的工作,因为它将在数字发布后工作;
  4. 同时 flatMap 请求 Integer.MAX_VALUE(默认情况下)从 Flux::range 中排出元素,并且 Flux::range 开始发射到 256 一次(因为flatMap有背压);
  5. 第 4 行中的回调将使用当前线程,因为第 2 行中的 subscribeOn 尚未执行;
  6. 第 2 行中的新值 subscribeOn 将为每个元素调用并为每个值选择一个线程。从这一刻起,所有工作都将在 Schedulers::elastic 中完成。因为 #2 比 #1 更接近发布者,#2 将更改源发射的上下文,所有上游工作将使用此调度程序代替 #1;
  7. 退出subscribe

我们可以提取两条规则:

  1. 如果 subscribeOn 在时间上比其他 subscribeOn 晚执行 - 所有上游都将使用它;
  2. 如果 subscribeOn 比其他时间执行得早 - 可以用 hidden 转换替换为其他 subscribeOn.

您需要了解,整个反应器都建立在 reactive streams 规范之上。 因此,上面的每个运算符都是发布者和订阅者的组合。

场景 2

   public static void main(String[] args) throws InterruptedException {
       Flux.range(1, 100_000)
           .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
           .flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
           .subscribe();
      System.out.println("Here");
      Thread.sleep(Integer.MAX_VALUE);
  }

发布者在主线程 (Flux.Range) 上启动,订阅者 (doOnNext) 正在订阅(在主线程上调用 onNext),它被委托给一个比订阅者可以发布。 所以整个 Flux.Range 在主线程上被调用并被分配到弹性线程池来处理

场景 1

  public static void main(String[] args) throws InterruptedException {
    Flux.range(1, 100_000)
        .doOnNext(a -> System.out.println(a + ", Publisher: " + Thread.currentThread().getName()))
        .flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic())).doOnNext(a -> System.out.println(a + ", Subscriber: " + Thread.currentThread().getName()))
        .subscribe();
    System.out.println("Here");
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static int blockingMethod(int s) {
    try {
      Thread.sleep(100_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return s;
  }

Flux.Range 的前 256 次调用发生在主线程上。请记住,reactive streams 是一种背压驱动的编程,由于平面地图只能合并 256 个上游,平面地图操作员(它是订阅者)不会在 256 个事件后调用 onNext 调用,因为它们都是 blocked/waiting 完成。

现在主线程被释放了,因为它完成了参与反应管道的责任并走出了传送带。即基本上继续执行下一行代码。 这就是为什么您会在前 256 个元素之后看到“这里”的原因。 然后主线程继续按照代码无限期阻塞。

一旦其中一个阻塞线程完成作业,它们将触发订阅者在完成较早请求的线程(即弹性线程)上调用 onNext。 弹性线程现在会将调用委托给内部反应器管道中的另一个弹性线程。

这意味着对于我发出的同一个项目,打印“Publisher”的线程和打印“Subscriber”的线程是不一样的,即使它们是弹性线程。 欢迎验证以下代码