RxJava Observable "Iteration" 是如何工作的?

How does RxJava Observable "Iteration" work?

我开始玩弄 RxJava 和 ReactFX,我对它非常着迷。但在我进行实验时,我有很多问题,而且我一直在寻找答案。

我观察到的一件事(没有双关语意)当然是延迟执行。通过下面的探索性代码,我注意到在调用 merge.subscribe(pet -> System.out.println(pet)) 之前没有执行任何操作。但令我着迷的是,当我订阅第二个订阅者 merge.subscribe(pet -> System.out.println("Feed " + pet)) 时,它又触发了 "iteration"。

我想了解的是迭代的行为。它的行为似乎不像只能使用一次的 Java 8 stream。它真的是一次遍历每个 String 并将其作为该时刻的值发布吗?在之前被解雇的订阅者之后的任何新订阅者是否会收到这些项目,就好像它们是新的一样?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}

Observable<T>代表一个monad,一个链式操作,而不是操作本身的执行。它是描述性语言,而不是您习惯的命令式语言。要执行操作,您 .subscribe() 就可以了。每次您订阅时,都会从头开始创建一个新的执行流。不要将流与线程混淆,因为 订阅是同步执行的,除非您将线程更改 指定为 .subscribeOn().observeOn()。您将新元素链接到任何现有 operation/monad/Observable 以添加 new behaviour,例如更改线程、过滤、累积、转换等。如果您的可观察对象是一项昂贵的操作,您不想在每次订阅时重复,您可以使用 .cache().

来防止重新创建

要使任何 asynchronous/synchronous Observable<T> 操作成为同步内联操作,请使用 .toBlocking() 将其类型更改为 BlockingObservable<T>。它包含新方法,而不是 .subscribe(),可以使用 .forEach() 对每个结果执行操作,或者使用 .first()

强制执行操作

Observables 是一个很好的工具,因为它们大部分*是确定性的(相同的输入总是产生相同的输出,除非你做错了什么),可重复使用(你可以将它们作为 command/policy 模式的一部分发送) 并且在大多数情况下忽略并发,因为它们不应该依赖共享状态(a.k.a。做错事)。如果您尝试将基于可观察对象的库引入命令式语言,或者只是对您有 100% 信心管理良好的可观察对象执行操作,则 BlockingObservables 非常有用。

围绕这些原则构建您的应用程序是一种范式的改变,我无法在这个答案中真正涵盖。

*There are breaches like Subject and Observable.create() that are needed to integrate with imperative frameworks.