Observable.zip 和 Observable.timer

Observable.zip with Observable.timer

我正在尝试压缩两个可观察对象,在本例中是一个计时器任务和一个实际任务,这样我的订阅者只有在两个任务都完成时才会被调用。我想用 Observable.zip 来做到这一点,但不知何故这不起作用,我的 Observable.timer 没有调用 onComplete()。这意味着订阅者永远不会得到 Observables.

两者的结果

当我突然将 zip 更改为 merge 时,Observable.timer 会调用 onComplete(),我在这里缺少什么?

Observable<Long> delayObs = Observable.timer(500, TimeUnit.MILLISECONDS);
Observable<Void> methodObs = method();

mSubscription = Observable.zip(delayObs, methodObs, (pLong, pVoid) -> pVoid)
                              .subscribeOn(Schedulers.io())
                              .observeOn(AndroidSchedulers.mainThread())
                              .subscribe(__ -> thisDoesntGetCalled(), throwable -> M2Log.e(LOG_TAG,
                                      "error", throwable));

方法:

private Observable<Void> method() {
    return Observable.defer(() -> {
        // some work
        return Observable.empty();
    });
}

我是不是遗漏了什么?

Zip 组合值,但您的方法 () 的 Observable 不发出任何东西,因此 zip 决定它永远无法组合任何东西然后完成。

您可以将方法更改为此,现在 zip 会调用您提供的函数:

private Observable<Integer> method() {
    return Observable.defer(() -> {
        // some work
        return Observable.just(1);
    });
}