RxJava Observable "Iteration" 是如何工作的?
How does RxJava Observable "Iteration" work?
我开始玩弄 RxJava 和 ReactFX,我对它非常着迷。但在我进行实验时,我有很多问题,而且我一直在寻找答案。
我观察到的一件事(没有双关语意)当然是延迟执行。通过下面的探索性代码,我注意到在调用 merge.subscribe(pet -> System.out.println(pet))
之前没有执行任何操作。但令我着迷的是,当我订阅第二个订阅者 merge.subscribe(pet -> System.out.println("Feed " + pet))
时,它又触发了 "iteration"。
我想了解的是迭代的行为。它的行为似乎不像只能使用一次的 Java 8 stream
。它真的是一次遍历每个 String
并将其作为该时刻的值发布吗?在之前被解雇的订阅者之后的任何新订阅者是否会收到这些项目,就好像它们是新的一样?
public class RxTest {
public static void main(String[] args) {
Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
.filter(dog -> dog.matches("D.*"));
Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));
Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));
Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);
merge.subscribe(pet -> System.out.println(pet));
merge.subscribe(pet -> System.out.println("Feed " + pet));
}
}
Observable<T>
代表一个monad,一个链式操作,而不是操作本身的执行。它是描述性语言,而不是您习惯的命令式语言。要执行操作,您 .subscribe()
就可以了。每次您订阅时,都会从头开始创建一个新的执行流。不要将流与线程混淆,因为 订阅是同步执行的,除非您将线程更改 指定为 .subscribeOn()
或 .observeOn()
。您将新元素链接到任何现有 operation/monad/Observable 以添加 new behaviour,例如更改线程、过滤、累积、转换等。如果您的可观察对象是一项昂贵的操作,您不想在每次订阅时重复,您可以使用 .cache()
.
来防止重新创建
要使任何 asynchronous/synchronous Observable<T>
操作成为同步内联操作,请使用 .toBlocking()
将其类型更改为 BlockingObservable<T>
。它包含新方法,而不是 .subscribe()
,可以使用 .forEach()
对每个结果执行操作,或者使用 .first()
强制执行操作
Observables 是一个很好的工具,因为它们大部分*是确定性的(相同的输入总是产生相同的输出,除非你做错了什么),可重复使用(你可以将它们作为 command/policy 模式的一部分发送) 并且在大多数情况下忽略并发,因为它们不应该依赖共享状态(a.k.a。做错事)。如果您尝试将基于可观察对象的库引入命令式语言,或者只是对您有 100% 信心管理良好的可观察对象执行操作,则 BlockingObservables 非常有用。
围绕这些原则构建您的应用程序是一种范式的改变,我无法在这个答案中真正涵盖。
*There are breaches like Subject
and Observable.create()
that are needed to integrate with imperative frameworks.
我开始玩弄 RxJava 和 ReactFX,我对它非常着迷。但在我进行实验时,我有很多问题,而且我一直在寻找答案。
我观察到的一件事(没有双关语意)当然是延迟执行。通过下面的探索性代码,我注意到在调用 merge.subscribe(pet -> System.out.println(pet))
之前没有执行任何操作。但令我着迷的是,当我订阅第二个订阅者 merge.subscribe(pet -> System.out.println("Feed " + pet))
时,它又触发了 "iteration"。
我想了解的是迭代的行为。它的行为似乎不像只能使用一次的 Java 8 stream
。它真的是一次遍历每个 String
并将其作为该时刻的值发布吗?在之前被解雇的订阅者之后的任何新订阅者是否会收到这些项目,就好像它们是新的一样?
public class RxTest {
public static void main(String[] args) {
Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
.filter(dog -> dog.matches("D.*"));
Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));
Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));
Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);
merge.subscribe(pet -> System.out.println(pet));
merge.subscribe(pet -> System.out.println("Feed " + pet));
}
}
Observable<T>
代表一个monad,一个链式操作,而不是操作本身的执行。它是描述性语言,而不是您习惯的命令式语言。要执行操作,您 .subscribe()
就可以了。每次您订阅时,都会从头开始创建一个新的执行流。不要将流与线程混淆,因为 订阅是同步执行的,除非您将线程更改 指定为 .subscribeOn()
或 .observeOn()
。您将新元素链接到任何现有 operation/monad/Observable 以添加 new behaviour,例如更改线程、过滤、累积、转换等。如果您的可观察对象是一项昂贵的操作,您不想在每次订阅时重复,您可以使用 .cache()
.
要使任何 asynchronous/synchronous Observable<T>
操作成为同步内联操作,请使用 .toBlocking()
将其类型更改为 BlockingObservable<T>
。它包含新方法,而不是 .subscribe()
,可以使用 .forEach()
对每个结果执行操作,或者使用 .first()
Observables 是一个很好的工具,因为它们大部分*是确定性的(相同的输入总是产生相同的输出,除非你做错了什么),可重复使用(你可以将它们作为 command/policy 模式的一部分发送) 并且在大多数情况下忽略并发,因为它们不应该依赖共享状态(a.k.a。做错事)。如果您尝试将基于可观察对象的库引入命令式语言,或者只是对您有 100% 信心管理良好的可观察对象执行操作,则 BlockingObservables 非常有用。
围绕这些原则构建您的应用程序是一种范式的改变,我无法在这个答案中真正涵盖。
*There are breaches like
Subject
andObservable.create()
that are needed to integrate with imperative frameworks.