RetryWhen() 延迟输入错误

Type error on RetryWhen() with delay

我在关注 this tutorial 然后添加了这一行:

.retryWhen(errors -> errors.flatMap(error -> Observable.timer(30, TimeUnit.SECONDS)))

给我的Transformer,但是它触发了一个编译错误:

error: incompatible types: cannot infer type-variable(s) R (argument mismatch; bad return type in lambda expression Observable cannot be converted to Publisher) where R,T are type-variables: R extends Object declared in method flatMap(Function>) T extends Object declared in class Flowable

下面的波浪线 error -> Observable.timer(30, TimeUnit.SECONDS) 表示:

no instance(s) of type variable(s) R exist so that Observable conforms to Publisher

我做错了什么?要像示例中那样工作还缺少什么?

显然使用 Flowable.timer() 而不是 Observable.timer 解决了这个问题,也许它只与 Single 有关?

What am I doing wrong?

请检查运算符的签名,以便您使用正确的类型:https://github.com/ReactiveX/RxJava#base-class-vs-base-type

JavaDoc:

public final Single<T> retryWhen(
    Function<? super Flowable<Throwable>,? extends Publisher<?>> handler)

Because this tutorial uses Observable.timer with no issues.

该教程早于 RxJava 2。事实上,上面链接的 Javadoc 包含一个示例 Flowable.timer():

Single.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
    AtomicInteger counter = new AtomicInteger();
    return errors
              .takeWhile(e -> counter.getAndIncrement() != 3)
              .flatMap(e -> {
                  System.out.println("delay retry by " + counter.get() + " second(s)");
//                vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
                  return Flowable.timer(counter.get(), TimeUnit.SECONDS);
//                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
              });
})
.blockingGet();

is this something only related to Single

retryWhenrepeatWhen 在设计上使用 Publisher 作为重做信号,这样我们就可以利用背压一次只请求一个这样的重做信号。使用 Observable,处理程序有可能一次简单地转储大量信号,操作员可能会出现意外行为。