Rx(Java 的响应式扩展)带时间间隔的 Zip 运算符

Rx (Reactive Extensions for Java) Zip operator with time interval

我是 RxJava 的新手,我已经研究了一段时间的运算符。

我看到这个在短时间间隔 (1s) 后发出项目的小例子:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
        return d + " " + t;
    }).toBlocking().forEach(System.out::println);

这有效,但是当我删除将源变成 BlockingObservable 的 toBlocking() 时,程序执行并结束时没有输出。

我通常会看弹珠图来正确理解事物: http://reactivex.io/documentation/operators/zip.html

最后一句话说:它只会发出与发出最少项目的源 Observable 发出的项目数量一样多的项目。

这是否意味着 data Observable 在不到 1 秒的时间内发出所有项目并在打印每个 Observable 的前两个项目之前结束?因为每个 Observable 本身都是异步的?

我需要清楚地了解正在发生的事情,以及是否有其他方法可以处理类似的情况。有人吗?

基本上你的主程序在可观察对象有机会发出任何东西之前退出,这就是你看不到任何输出的原因。修复它的方法是以某种方式阻塞,直到 Observable 发出所有项目,这是使用 CountDownLatch:

的一种方法
CountDownLatch latch = new CountDownLatch(1);
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
    return d + " " + t;
}).finallyDo(latch::countDown).forEach(System.out::println);

latch.await(10, TimeUnit.SECONDS);

Observable.interval 在幕后使用 Scheduler。它将从另一个线程发出。与此同时,主线程已经完成了所有的组合,即将退出。大概您在 main 方法中有此代码,这就是您的程序退出的原因。

在真实系统中这应该不是问题(除非你的真实系统是一个包含这段代码的 main 方法)。

在示例程序中,您可以通过从标准输入读取一个字节来导致主线程阻塞。像这样:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
        .subscribe(System.out::println);

System.in.read();