SubscribeOn 不会改变整个链的线程池
SubscribeOn does not change the thread pool for the whole chain
我想通过休息请求和 WebFlux 触发更长的 运行ning 操作。调用的结果应该只是 return 操作已经开始的信息。我想在不同的调度程序(例如 Schedulers.single())上 运行 执行长 运行ning 操作。为此,我使用了 subscribeOn:
Mono<RecalculationRequested> recalculateAll() {
return provider.size()
.doOnNext(size -> log.info("Size: {}", size))
.doOnNext(size -> recalculate(size))
.map(RecalculationRequested::new);
}
private void recalculate(int toRecalculateSize) {
Mono.just(toRecalculateSize)
.flatMapMany(this::toPages)
.flatMap(page -> recalculate(page))
.reduce(new RecalculationResult(), RecalculationResult::increment)
.subscribeOn(Schedulers.single())
.subscribe(result -> log.info("Result of recalculation - success:{}, failed: {}",
result.getSuccess(), result.getFailed()));
}
private Mono<RecalculationResult> recalculate(RecalculationPage pageToRecalculate) {
return provider.findElementsToRecalculate(pageToRecalculate.getPageNumber(), pageToRecalculate.getPageSize())
.flatMap(this::recalculateSingle)
.reduce(new RecalculationResult(), RecalculationResult::increment);
}
private Mono<RecalculationResult> recalculateSingle(ElementToRecalculate elementToRecalculate) {
return recalculationTrigger.recalculate(elementToRecalculate)
.doOnNext(result -> {
log.info("Finished recalculation for element: {}", elementToRecalculate);
})
.doOnError(error -> {
log.error("Error during recalculation for element: {}", elementToRecalculate, error);
});
}
从上面我要调用:
private void recalculate(int toRecalculateSize)
在另一个线程中。但是,它不会 运行 在单个线程池上 - 它使用不同的线程池。我希望 subscribeOn 为整个链改变它。我应该更改什么以及为什么要在单线程池中执行它?
顺便提一下-方法:
provider.findElementsToRecalculate(...)
使用 WebClient 获取元素。
根据 documentation ,所有以 doOn 为前缀的运算符有时被称为具有“副作用”。它们让您可以在不修改序列事件的情况下窥视它们。
如果您想在 'provider.size()' 之后链接 'recalculate' 步骤,请使用 flatMap。
subscribeOn
的一个警告是它按照它说的做:它在提供的 Scheduler
上运行“订阅”行为。在运行时,订阅流从下到上(Subscriber
订阅其父 Publisher
)。
通常您会在文档和演示文稿中看到 subscribeOn
影响整个链条。这是因为大多数操作员/源自己不会更改线程,并且默认情况下会从订阅它们的线程开始发送 onNext
/onComplete
/onError
信号。
但是,一旦一个操作员在该自上而下的数据路径中切换线程,subscribeOn
的范围就会停在那里。典型的例子是当链中有一个publishOn
时。
本例中的数据源是 reactor-netty
和 netty
,它们在自己的线程上运行,因此就像在源处有一个 publishOn
一样。
对于 WebFlux,我建议在运算符的主链中使用 publishOn
,或者在内链中使用 subscribeOn
,例如 flatMap
.
我想通过休息请求和 WebFlux 触发更长的 运行ning 操作。调用的结果应该只是 return 操作已经开始的信息。我想在不同的调度程序(例如 Schedulers.single())上 运行 执行长 运行ning 操作。为此,我使用了 subscribeOn:
Mono<RecalculationRequested> recalculateAll() {
return provider.size()
.doOnNext(size -> log.info("Size: {}", size))
.doOnNext(size -> recalculate(size))
.map(RecalculationRequested::new);
}
private void recalculate(int toRecalculateSize) {
Mono.just(toRecalculateSize)
.flatMapMany(this::toPages)
.flatMap(page -> recalculate(page))
.reduce(new RecalculationResult(), RecalculationResult::increment)
.subscribeOn(Schedulers.single())
.subscribe(result -> log.info("Result of recalculation - success:{}, failed: {}",
result.getSuccess(), result.getFailed()));
}
private Mono<RecalculationResult> recalculate(RecalculationPage pageToRecalculate) {
return provider.findElementsToRecalculate(pageToRecalculate.getPageNumber(), pageToRecalculate.getPageSize())
.flatMap(this::recalculateSingle)
.reduce(new RecalculationResult(), RecalculationResult::increment);
}
private Mono<RecalculationResult> recalculateSingle(ElementToRecalculate elementToRecalculate) {
return recalculationTrigger.recalculate(elementToRecalculate)
.doOnNext(result -> {
log.info("Finished recalculation for element: {}", elementToRecalculate);
})
.doOnError(error -> {
log.error("Error during recalculation for element: {}", elementToRecalculate, error);
});
}
从上面我要调用:
private void recalculate(int toRecalculateSize)
在另一个线程中。但是,它不会 运行 在单个线程池上 - 它使用不同的线程池。我希望 subscribeOn 为整个链改变它。我应该更改什么以及为什么要在单线程池中执行它?
顺便提一下-方法:
provider.findElementsToRecalculate(...)
使用 WebClient 获取元素。
根据 documentation ,所有以 doOn 为前缀的运算符有时被称为具有“副作用”。它们让您可以在不修改序列事件的情况下窥视它们。
如果您想在 'provider.size()' 之后链接 'recalculate' 步骤,请使用 flatMap。
subscribeOn
的一个警告是它按照它说的做:它在提供的 Scheduler
上运行“订阅”行为。在运行时,订阅流从下到上(Subscriber
订阅其父 Publisher
)。
通常您会在文档和演示文稿中看到 subscribeOn
影响整个链条。这是因为大多数操作员/源自己不会更改线程,并且默认情况下会从订阅它们的线程开始发送 onNext
/onComplete
/onError
信号。
但是,一旦一个操作员在该自上而下的数据路径中切换线程,subscribeOn
的范围就会停在那里。典型的例子是当链中有一个publishOn
时。
本例中的数据源是 reactor-netty
和 netty
,它们在自己的线程上运行,因此就像在源处有一个 publishOn
一样。
对于 WebFlux,我建议在运算符的主链中使用 publishOn
,或者在内链中使用 subscribeOn
,例如 flatMap
.