没有跳过错误的 ConcatMap 选项 - RxJava

No option for ConcatMap with skip error - RxJava

考虑这个例子: 我有一个文件按顺序下载。如果一个下载失败,它应该移动到下一个。 伪代码:

Observable.from(urls)
 .concatMap(url -> downloadObservable(url)) 

如果下载失败,则没有移动到下一个的选项url。

无法使用 onErrorResumeNext() 跳过,因为我只想转到下一个 url。有人可以帮忙吗?

根据:https://github.com/ReactiveX/RxJava/issues/3870 没有办法做到这一点。当然你可以引入一些其他的错误处理,即在 downloadObservable 中处理错误然后过滤空答案。

如果您使用的是 RxJava 1,一个快速而肮脏的解决方案是 return null 当下载失败时,然后将它们过滤掉:

Observable
    .from(urls)
    .concatMap(url -> downloadObservable(url).onErrorReturn(null))
    .filter(result -> result != null)

一个更好的解决方案是为结果创建一个包装器,使用 wasSuccessful() 之类的方法来检查过滤器,并使用 getResult() 这样的方法从包装器中提取结果。这样您就不必处理空值。

你必须认为这是一个管道,所以如果你不想停止管道的发射,你必须控制错误和 return 一些东西才能继续下一个发射.

使用 onErrorResumeNext 并且之后不停止发射的唯一方法是在 flatMap 中执行

Observable.from(urls)
 .flatMap(url -> downloadObservable(url)
                   .onErrorResumeNext(t -> Observable.just("Something went wrong")))) 

你可以在这里看到一个例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

从 1.3 开始有一个运算符:concatMapDelayError。一般来说,如果有理由将错误延迟到所有源都被完全消耗掉,那么可能有一个 opNameDelayError 运算符。

Observable.from(urls)
    .concatMapDelayError(url -> downloadObservable(url))
    .doOnError(error -> {
        if (error instanceof CompositeException) {
           System.out.println(((CompositeException)error).getExceptions().size());
        } else {
            System.out.println(1);
        }
    });

doOnError 附录来自 RxJava 问题列表中更新的 OP cross post。)