如果发生错误,如何停止 Reactor flatMap

How to stop Reactor flatMap if error occurs

我正在使用 Reactor 链加载数据:

 public Flux<Report> collectReport(List<MarkId> marks) {
        return Flux.fromIterable(marks)
                .flatMap(this::prepareRequest)
                .collectList()
                .flatMapIterable(e -> e)
                .delayElements(Duration.ofMillis(200))
                .concatMap(this::createReport)
                .retryWhen(retryConfig)
                .onErrorResume(throwable -> {
                    log.error(throwable.getMessage()); 
                    return Mono.empty();
                });
                .flatMap(response -> some actions here..//)
                .buffer(1000)
                .publishOn(Schedulers.newParallel("The rep saving", 4))
                .flatMap(googleAnalyticsReports -> {
                            //saving to database here
                        }
                );
    }

concatMap(this::createReport) 可能会产生错误,将执行重试。因此,在对一个 ID 的重试未耗尽之前,应用程序将停止并且不会发送另一个请求。

为了提高加载速度,我决定将 concatMap 替换为 flatMap。但是 flatMap 并不是那么可预测的。现在,如果发生错误,应用程序将继续从其他线程发送请求,而忽略一些 id 已经在 retry 情况下的事实,并且由于这种行为,我将继续从服务器(API 限制)之前收到 429 错误超时已激活。

所以,我的问题是:如果发生特定 id 的特定错误,我如何停止从 flatMap 加载数据并等待所有重试耗尽?如果我收到 429 错误,应用程序应停止加载当前 ID 的数据,然后转到重试案例,我可以等待超时。虽然未传递此 ID,但应用程序 不应为列表中的其他 ID 发送另一个请求

flatMap 无法实现您正在尝试的操作,原因很简单 flatMap 急切地订阅内部流 - 也就是说,它不会等待一个流完成它订阅下一个。由于 createReport 是异步的,一旦工作被卸载到 I/O 线程,flatMap 就会从下一个元素创建一个新流并订阅它。将此与 concatMap 进行比较,后者在订阅下一个流之前等待内部流完成。

你想做的事情有点自相矛盾 - 你想提高加载速度(所以你想并行调用)但是你想对这些请求进行排序 b/w。您希望请求必须知道任何先前请求的状态。 flatMap 不提供任何顺序保证,因此您不能在此处使用此运算符。