SubscribeOn 不会改变整个链的线程池

SubscribeOn does not change the thread pool for the whole chain

我想通过休息请求和 WebFlux 触发更长的 运行ning 操作。调用的结果应该只是 return 操作已经开始的信息。我想在不同的调度程序(例如 Schedulers.single())上 运行 执行长 运行ning 操作。为此,我使用了 subscribeOn:

Mono<RecalculationRequested> recalculateAll() {
  return provider.size()
      .doOnNext(size -> log.info("Size: {}", size))
      .doOnNext(size -> recalculate(size))
      .map(RecalculationRequested::new);
}

private void recalculate(int toRecalculateSize) {
  Mono.just(toRecalculateSize)
      .flatMapMany(this::toPages)
      .flatMap(page -> recalculate(page))
      .reduce(new RecalculationResult(), RecalculationResult::increment)
      .subscribeOn(Schedulers.single())
      .subscribe(result -> log.info("Result of recalculation - success:{}, failed: {}",
          result.getSuccess(), result.getFailed()));
}

private Mono<RecalculationResult> recalculate(RecalculationPage pageToRecalculate) {
  return provider.findElementsToRecalculate(pageToRecalculate.getPageNumber(), pageToRecalculate.getPageSize())
      .flatMap(this::recalculateSingle)
      .reduce(new RecalculationResult(), RecalculationResult::increment);
}

private Mono<RecalculationResult> recalculateSingle(ElementToRecalculate elementToRecalculate) {
  return recalculationTrigger.recalculate(elementToRecalculate)
      .doOnNext(result -> {
        log.info("Finished recalculation for element: {}", elementToRecalculate);
      })
      .doOnError(error -> {
        log.error("Error during recalculation for element: {}", elementToRecalculate, error);
      });
}

从上面我要调用:

private void recalculate(int toRecalculateSize)

在另一个线程中。但是,它不会 运行 在单个线程池上 - 它使用不同的线程池。我希望 subscribeOn 为整个链改变它。我应该更改什么以及为什么要在单线程池中执行它?

顺便提一下-方法:

provider.findElementsToRecalculate(...)

使用 WebClient 获取元素。

根据 documentation ,所有以 doOn 为前缀的运算符有时被称为具有“副作用”。它们让您可以在不修改序列事件的情况下窥视它们。

如果您想在 'provider.size()' 之后链接 'recalculate' 步骤,请使用 flatMap。

subscribeOn 的一个警告是它按照它说的做:它在提供的 Scheduler 上运行“订阅”行为。在运行时,订阅流从下到上(Subscriber 订阅其父 Publisher)。

通常您会在文档和演示文稿中看到 subscribeOn 影响整个链条。这是因为大多数操作员/源自己不会更改线程,并且默认情况下会从订阅它们的线程开始发送 onNext/onComplete/onError 信号。

但是,一旦一个操作员在该自上而下的数据路径中切换线程,subscribeOn 的范围就会停在那里。典型的例子是当链中有一个publishOn时。

本例中的数据源是 reactor-nettynetty,它们在自己的线程上运行,因此就像在源处有一个 publishOn 一样。

对于 WebFlux,我建议在运算符的主链中使用 publishOn,或者在内链中使用 subscribeOn,例如 flatMap.