如何使用 RxJava 2 来安排重试比赛?
How do I use RxJava 2 to orchestrate a race with retry?
假设我们有一组不稳定的(有时会失败的)解析器,它们可能无法处理给定的文件,即给定的解析器要么以某种概率 p > 0
成功,要么总是失败(p=0
).是否可以使用 RxJava 让这组解析器订阅传入文件流并 'race' 解析文件?
鉴于解析器最初可能会失败但仍然能够解析文件,因此有必要让它们使用一些退避策略进行重试。鉴于也可能没有解析器能够处理给定的文件,重试次数应该被限制。
使用 retryWhen
和类似这样的东西 (source):
实现指数退避相对容易
source.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);
但是,设置平行比赛是我不知道该怎么做的事情。似乎 amb
运算符是我们在这里想要的,但将其应用于任意数量的流似乎需要使用 blockingIterable
,这(我认为)破坏了比赛的目的,因为它会阻塞。我一直无法在互联网上找到与 amb
这个用例相关的任何有用信息。
到目前为止,我的尝试类似于这样:
Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();
Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();
fileSource
.flatMap(f -> parsers.take(numParsers)
.map(p -> p.parse(f))
.retryWhen(/* snippet from above */)
.onErrorReturn(/* some error value */)
).take(1)
Flowable
引入了 .parallel()
运算符,它最近添加了 ParallelFailureHandling
(see this pr),它有一个 RETRY
方法,但我可以' 似乎让可流动对象在其中一个返回后停止重试。
这个问题可以用 RxJava 解决吗?
合理假设您的解析器是同步的,例如
Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();
ArrayList<Flowable<T>> parserObservableList = new ArrayList<>();
for (Parser p: parserSet) {
parserObservableList.add(Flowable.fromCallable(() -> p.parse(f))
.retryWhen(/* Add your retry logic */)
.onErrorReturn(/* some error value */));
}
Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */);
应该能满足你的要求。
假设我们有一组不稳定的(有时会失败的)解析器,它们可能无法处理给定的文件,即给定的解析器要么以某种概率 p > 0
成功,要么总是失败(p=0
).是否可以使用 RxJava 让这组解析器订阅传入文件流并 'race' 解析文件?
鉴于解析器最初可能会失败但仍然能够解析文件,因此有必要让它们使用一些退避策略进行重试。鉴于也可能没有解析器能够处理给定的文件,重试次数应该被限制。
使用 retryWhen
和类似这样的东西 (source):
source.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);
但是,设置平行比赛是我不知道该怎么做的事情。似乎 amb
运算符是我们在这里想要的,但将其应用于任意数量的流似乎需要使用 blockingIterable
,这(我认为)破坏了比赛的目的,因为它会阻塞。我一直无法在互联网上找到与 amb
这个用例相关的任何有用信息。
到目前为止,我的尝试类似于这样:
Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();
Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();
fileSource
.flatMap(f -> parsers.take(numParsers)
.map(p -> p.parse(f))
.retryWhen(/* snippet from above */)
.onErrorReturn(/* some error value */)
).take(1)
Flowable
引入了 .parallel()
运算符,它最近添加了 ParallelFailureHandling
(see this pr),它有一个 RETRY
方法,但我可以' 似乎让可流动对象在其中一个返回后停止重试。
这个问题可以用 RxJava 解决吗?
合理假设您的解析器是同步的,例如
Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();
ArrayList<Flowable<T>> parserObservableList = new ArrayList<>();
for (Parser p: parserSet) {
parserObservableList.add(Flowable.fromCallable(() -> p.parse(f))
.retryWhen(/* Add your retry logic */)
.onErrorReturn(/* some error value */));
}
Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */);
应该能满足你的要求。