如何使用 RxJava 2 来安排重试比赛?

How do I use RxJava 2 to orchestrate a race with retry?

假设我们有一组不稳定的(有时会失败的)解析器,它们可能无法处理给定的文件,即给定的解析器要么以某种概率 p > 0 成功,要么总是失败(p=0).是否可以使用 RxJava 让这组解析器订阅传入文件流并 'race' 解析文件?

鉴于解析器最初可能会失败但仍然能够解析文件,因此有必要让它们使用一些退避策略进行重试。鉴于也可能没有解析器能够处理给定的文件,重试次数应该被限制。

使用 retryWhen 和类似这样的东西 (source):

实现指数退避相对容易
source.retryWhen(errors ->
                errors.zipWith(Observable.range(1, 3), (n, i) -> i)
                      .flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);

但是,设置平行比赛是我不知道该怎么做的事情。似乎 amb 运算符是我们在这里想要的,但将其应用于任意数量的流似乎需要使用 blockingIterable,这(我认为)破坏了比赛的目的,因为它会阻塞。我一直无法在互联网上找到与 amb 这个用例相关的任何有用信息。

到目前为止,我的尝试类似于这样:

Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers 
int numParsers = parserSet.size();

Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();

fileSource
    .flatMap(f -> parsers.take(numParsers)
                         .map(p -> p.parse(f))
                         .retryWhen(/* snippet from above */)
                         .onErrorReturn(/* some error value */)
             ).take(1) 

Flowable 引入了 .parallel() 运算符,它最近添加了 ParallelFailureHandling (see this pr),它有一个 RETRY 方法,但我可以' 似乎让可流动对象在其中一个返回后停止重试。

这个问题可以用 RxJava 解决吗?

合理假设您的解析器是同步的,例如

Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers 
int numParsers = parserSet.size();

ArrayList<Flowable<T>> parserObservableList = new ArrayList<>();

for (Parser p: parserSet) {
    parserObservableList.add(Flowable.fromCallable(() -> p.parse(f))
                                     .retryWhen(/* Add your retry logic */)
                                     .onErrorReturn(/* some error value */));
}

Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */);

应该能满足你的要求。