RxJava2 发布
RxJava2 Publish
有什么区别
ObservableTransformer {
Observable.merge(
it.ofType(x).compose(transformerherex),
it.ofType(y).compose(transformerherey)
)
}
和
ObservableTransformer {
it.publish{ shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
当我 运行 我的代码使用这两个时,我得到了相同的结果。 publish 在这里做什么。
publish
运算符将您的 Observable 转换为 Connectable Observable
.
让我们看看 Connectable Observable
是什么意思:假设您想要多次订阅一个 observable 并希望为每个订阅者提供相同的项目。您需要使用 Connectable Observable
。
示例:
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
输出:
first subscription : 0
first subscription : 1
second subscription : 1
first subscription : 2
second subscription : 2
在这种情况下,我们可以在第一个项目发布之前快速订阅,但仅限于第一个订阅。第二次订阅晚了,错过了第一次发布。
我们可以将 Connect() 方法的调用移动到完成所有订阅之后。这样,即使调用 Thread.Sleep 我们也不会真正订阅基础,直到两个订阅都完成。这将按如下方式完成:
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
observable.Connect();
输出:
first subscription : 0
second subscription : 0
first subscription : 1
second subscription : 1
first subscription : 2
second subscription : 2
所以使用 Completable Observable,我们有办法控制何时让 Observable 发射项目。
示例取自:http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect
编辑
根据 this link 中的第 180 张幻灯片:
发布的另一个性质是,如果任何观察者在 observable 开始发射项目 10 秒后开始观察,则观察者只会获得 10 秒后(订阅时)发射的项目,而不是所有项目。所以在侧面,据我所知,发布正在用于 UI 事件。完全有道理的是,任何观察者都应该只接收在它订阅之后执行的那些事件,而不是之前发生的所有事件。
希望对您有所帮助。
不同之处在于,对于下游的一次订阅,顶级转换器会两次订阅上游,从而复制通常不需要的上游的任何副作用:
Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
.doOnSubscribe(s -> System.out.println("Subscribed!"));
mixedSource.compose(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
将打印
Subscribed!
2
3
4
Subscribed!
A
B
C
此处表示的副作用是打印输出 Subscribed!
根据真实源中的实际工作,这可能意味着发送电子邮件两次,检索 table 的行两次。通过这个特定示例,您可以看到即使源值在其类型中交错,输出也会单独包含它们。
相比之下,publish(Function)
将为每个最终订阅者建立一个对源的订阅,因此源的任何副作用只会发生一次。
mixedSource.publish(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
打印
Subscribed!
A
2
B
3
C
4
因为源被订阅一次并且每个项目都被多播到 .ofType().compose()
的两个 "arms"。
有什么区别
ObservableTransformer {
Observable.merge(
it.ofType(x).compose(transformerherex),
it.ofType(y).compose(transformerherey)
)
}
和
ObservableTransformer {
it.publish{ shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
当我 运行 我的代码使用这两个时,我得到了相同的结果。 publish 在这里做什么。
publish
运算符将您的 Observable 转换为 Connectable Observable
.
让我们看看 Connectable Observable
是什么意思:假设您想要多次订阅一个 observable 并希望为每个订阅者提供相同的项目。您需要使用 Connectable Observable
。
示例:
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
输出:
first subscription : 0
first subscription : 1
second subscription : 1
first subscription : 2
second subscription : 2
在这种情况下,我们可以在第一个项目发布之前快速订阅,但仅限于第一个订阅。第二次订阅晚了,错过了第一次发布。
我们可以将 Connect() 方法的调用移动到完成所有订阅之后。这样,即使调用 Thread.Sleep 我们也不会真正订阅基础,直到两个订阅都完成。这将按如下方式完成:
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
observable.Connect();
输出:
first subscription : 0
second subscription : 0
first subscription : 1
second subscription : 1
first subscription : 2
second subscription : 2
所以使用 Completable Observable,我们有办法控制何时让 Observable 发射项目。
示例取自:http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect
编辑 根据 this link 中的第 180 张幻灯片:
发布的另一个性质是,如果任何观察者在 observable 开始发射项目 10 秒后开始观察,则观察者只会获得 10 秒后(订阅时)发射的项目,而不是所有项目。所以在侧面,据我所知,发布正在用于 UI 事件。完全有道理的是,任何观察者都应该只接收在它订阅之后执行的那些事件,而不是之前发生的所有事件。
希望对您有所帮助。
不同之处在于,对于下游的一次订阅,顶级转换器会两次订阅上游,从而复制通常不需要的上游的任何副作用:
Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
.doOnSubscribe(s -> System.out.println("Subscribed!"));
mixedSource.compose(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
将打印
Subscribed!
2
3
4
Subscribed!
A
B
C
此处表示的副作用是打印输出 Subscribed!
根据真实源中的实际工作,这可能意味着发送电子邮件两次,检索 table 的行两次。通过这个特定示例,您可以看到即使源值在其类型中交错,输出也会单独包含它们。
相比之下,publish(Function)
将为每个最终订阅者建立一个对源的订阅,因此源的任何副作用只会发生一次。
mixedSource.publish(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
打印
Subscribed!
A
2
B
3
C
4
因为源被订阅一次并且每个项目都被多播到 .ofType().compose()
的两个 "arms"。