RxJava:结合冷热可观察
RxJava: Combine hot and cold observable
我有两个 Observable。它们都是 Observable<T>
类型。
一个是冷的,叫做 initialValueObservable
,它只是从 Observable.from()
的项目列表中发出。
另一个是名为 valueUpdateObservable
的热门项目,它是一个 PublishSubject
,当有新项目时通知订阅者。
在客户端中,我想订阅两者,因此我从 initialValueObservable
和 valueUpdateObservable
发布的更新中获取初始值。
我最初的方法是合并它们,但我认为这不会起作用,因为 initialValueObservable
将发送 onComplete
,此时 valueUpdateObservable
发出的新项目将不会到达。
当您进行合并时,onComplete
仅在所有源可观察对象(在本例中为两者)完成时才发送。
以这段(c#Rx.NET)代码为例:
Observable
.Return(42L)
.Merge(Observable.Interval(TimeSpan.FromSeconds(1.0)).Take(3))
.Subscribe(x => Console.WriteLine(x));
它产生:
42
0
1
2
如果 RxJava 做了任何不同的事情,我会感到震惊。
万一有人来找这个,这里有一个例子
Observable.concat(
coldObservable.doOnComplete{ () -> Log.d(TAG, "cold observable completed"); },
hotObservable.doOnComplete{ () -> Log.d(TAG, "hot observable completed") }
.subscribe()
将记录所有 coldObservable 的值,然后是 "cold observable completed",然后是所有 hotObvservable 的值。 "hot observable completed" 不会被记录,假设它类似于没有完成状态的点击侦听器
我有两个 Observable。它们都是 Observable<T>
类型。
一个是冷的,叫做 initialValueObservable
,它只是从 Observable.from()
的项目列表中发出。
另一个是名为 valueUpdateObservable
的热门项目,它是一个 PublishSubject
,当有新项目时通知订阅者。
在客户端中,我想订阅两者,因此我从 initialValueObservable
和 valueUpdateObservable
发布的更新中获取初始值。
我最初的方法是合并它们,但我认为这不会起作用,因为 initialValueObservable
将发送 onComplete
,此时 valueUpdateObservable
发出的新项目将不会到达。
当您进行合并时,onComplete
仅在所有源可观察对象(在本例中为两者)完成时才发送。
以这段(c#Rx.NET)代码为例:
Observable
.Return(42L)
.Merge(Observable.Interval(TimeSpan.FromSeconds(1.0)).Take(3))
.Subscribe(x => Console.WriteLine(x));
它产生:
42
0
1
2
如果 RxJava 做了任何不同的事情,我会感到震惊。
万一有人来找这个,这里有一个例子
Observable.concat(
coldObservable.doOnComplete{ () -> Log.d(TAG, "cold observable completed"); },
hotObservable.doOnComplete{ () -> Log.d(TAG, "hot observable completed") }
.subscribe()
将记录所有 coldObservable 的值,然后是 "cold observable completed",然后是所有 hotObvservable 的值。 "hot observable completed" 不会被记录,假设它类似于没有完成状态的点击侦听器