如果发生错误,如何停止 Reactor flatMap
How to stop Reactor flatMap if error occurs
我正在使用 Reactor 链加载数据:
public Flux<Report> collectReport(List<MarkId> marks) {
return Flux.fromIterable(marks)
.flatMap(this::prepareRequest)
.collectList()
.flatMapIterable(e -> e)
.delayElements(Duration.ofMillis(200))
.concatMap(this::createReport)
.retryWhen(retryConfig)
.onErrorResume(throwable -> {
log.error(throwable.getMessage());
return Mono.empty();
});
.flatMap(response -> some actions here..//)
.buffer(1000)
.publishOn(Schedulers.newParallel("The rep saving", 4))
.flatMap(googleAnalyticsReports -> {
//saving to database here
}
);
}
concatMap(this::createReport)
可能会产生错误,将执行重试。因此,在对一个 ID 的重试未耗尽之前,应用程序将停止并且不会发送另一个请求。
为了提高加载速度,我决定将 concatMap
替换为 flatMap
。但是 flatMap
并不是那么可预测的。现在,如果发生错误,应用程序将继续从其他线程发送请求,而忽略一些 id 已经在 retry
情况下的事实,并且由于这种行为,我将继续从服务器(API 限制)之前收到 429 错误超时已激活。
所以,我的问题是:如果发生特定 id 的特定错误,我如何停止从 flatMap 加载数据并等待所有重试耗尽?如果我收到 429 错误,应用程序应停止加载当前 ID 的数据,然后转到重试案例,我可以等待超时。虽然未传递此 ID,但应用程序 不应为列表中的其他 ID 发送另一个请求。
flatMap
无法实现您正在尝试的操作,原因很简单 flatMap
急切地订阅内部流 - 也就是说,它不会等待一个流完成它订阅下一个。由于 createReport 是异步的,一旦工作被卸载到 I/O
线程,flatMap
就会从下一个元素创建一个新流并订阅它。将此与 concatMap
进行比较,后者在订阅下一个流之前等待内部流完成。
你想做的事情有点自相矛盾 - 你想提高加载速度(所以你想并行调用)但是你想对这些请求进行排序 b/w。您希望请求必须知道任何先前请求的状态。 flatMap
不提供任何顺序保证,因此您不能在此处使用此运算符。
我正在使用 Reactor 链加载数据:
public Flux<Report> collectReport(List<MarkId> marks) {
return Flux.fromIterable(marks)
.flatMap(this::prepareRequest)
.collectList()
.flatMapIterable(e -> e)
.delayElements(Duration.ofMillis(200))
.concatMap(this::createReport)
.retryWhen(retryConfig)
.onErrorResume(throwable -> {
log.error(throwable.getMessage());
return Mono.empty();
});
.flatMap(response -> some actions here..//)
.buffer(1000)
.publishOn(Schedulers.newParallel("The rep saving", 4))
.flatMap(googleAnalyticsReports -> {
//saving to database here
}
);
}
concatMap(this::createReport)
可能会产生错误,将执行重试。因此,在对一个 ID 的重试未耗尽之前,应用程序将停止并且不会发送另一个请求。
为了提高加载速度,我决定将 concatMap
替换为 flatMap
。但是 flatMap
并不是那么可预测的。现在,如果发生错误,应用程序将继续从其他线程发送请求,而忽略一些 id 已经在 retry
情况下的事实,并且由于这种行为,我将继续从服务器(API 限制)之前收到 429 错误超时已激活。
所以,我的问题是:如果发生特定 id 的特定错误,我如何停止从 flatMap 加载数据并等待所有重试耗尽?如果我收到 429 错误,应用程序应停止加载当前 ID 的数据,然后转到重试案例,我可以等待超时。虽然未传递此 ID,但应用程序 不应为列表中的其他 ID 发送另一个请求。
flatMap
无法实现您正在尝试的操作,原因很简单 flatMap
急切地订阅内部流 - 也就是说,它不会等待一个流完成它订阅下一个。由于 createReport 是异步的,一旦工作被卸载到 I/O
线程,flatMap
就会从下一个元素创建一个新流并订阅它。将此与 concatMap
进行比较,后者在订阅下一个流之前等待内部流完成。
你想做的事情有点自相矛盾 - 你想提高加载速度(所以你想并行调用)但是你想对这些请求进行排序 b/w。您希望请求必须知道任何先前请求的状态。 flatMap
不提供任何顺序保证,因此您不能在此处使用此运算符。