repeatWhen() 没有完成

repeatWhen() does not complete

我 运行 遇到了让 repeatWhen()Completable 一起工作的问题。下面的代码块正在工作。订阅的 Action 在重复后结束 运行。

completable
.subscribeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.repeat(2)
.observeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.subscribe(new Action() {
    @Override
    public void run() throws Exception {
        //runs
    }
}

但是,当我尝试以下操作时

completable
.subscribeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.repeatWhen(new MyRetry(2))
.observeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.subscribe(new Action() {
    @Override
    public void run() throws Exception {
        //doesn't run
    }
}

MyRetry定义如下:

class MyRetry implements Function<Flowable<Object>, Publisher<?>> {

    private final int maxRetryCount;
    private int retryCount;

    /**
     * @param maxRetryCount The number of times to repeat
     */
    public MyRetry(final int maxRetryCount) {
        this.maxRetryCount = maxRetryCount;
        retryCount = 0;
    }

    @Override
    public Publisher<?> apply(final Flowable<Object> objectFlowable) throws Exception {
        return objectFlowable.flatMap(new Function<Object, Publisher<?>>() {
            @Override
            public Publisher<?> apply(final Object o) throws Exception {
                if (retryCount < maxRetryCount) {
                    retryCount++;
                    return Flowable.just(o);
                } else {
                    return Flowable.empty();
                }
            }
        });
    }
}

订阅的Action在重复完成后不再运行,但是Completable的原始订阅重复了两次。这让我相信我返回 Flowable.empty() 可能不正确,但我找不到关于这个主题的信息,而且我对 RxJava 还很陌生。我也有基础设施限制,让我仍然停留在 Java 7 in Android.

如果您在 MyRetry class:

中将 flatMap() 更改为 takeUntil(),初始订阅者将收到 onCompleted() 事件
@Override
public Publisher<?> apply(final Flowable<Object> objectFlowable) throws Exception {
    return objectFlowable.takeWhile(o -> retryCount++ < maxRetryCount);
}

此问题已在 this 中讨论 线程.