在 zipWith 中使用范围也会在应用拉链功能之前发出范围序列中的所有项目

Using range in zipWith also emits all items from range sequence before zipper function applied

问题是关于 RxJava2 的。

注意到使用 range 压缩来自 retryWhenThrowable 会在应用压缩函数之前发出来自 Observable.range 的所有项目。此外,即使未调用 zipWithrange 也会发出序列。例如这个源代码

Observable.create<String> {
        println("subscribing")
        it.onError(RuntimeException("always fails"))
    }
    .retryWhen {
        it.zipWith(Observable.range(1, 3).doOnNext { println("range $it") },
                BiFunction { t: Throwable, i: Int -> i })
                .flatMap {
                    System.out.println("delay retry by $it + second(s)")
                    Observable.timer(it.toLong(), TimeUnit.SECONDS)
                }
    }./*subscribe*/

给出以下结果

 range 1
 range 2
 range 3
 subscribing
 delay retry by 1 + second(s)
 subscribing
 delay retry by 2 + second(s)
 subscribing
 delay retry by 3 + second(s)
 subscribing
 onComplete

observable 创建中替换 onError 也不会消除发射 range 项目。所以问题是为什么 Range 很冷。

Observable 在 2.x 中没有背压,因此 range 运算符将尽快发出所有项目。但是,您的情况可以使用随着重试处理程序的错误通知递增的普通计数器:

source.retryWhen(e -> {
    int[] counter = { 0 };
    return e.takeWhile(v -> ++counter[0] < 4)
            .flatMap(v -> Observable.timer(counter[0], TimeUnit.SECONDS));
})