在 android 上创建一个可观察发射器
Create an observable emitter on android
我有一个 android 应用程序,它从 websocket 连接接收数据。此数据存储在 Hashmap 上,必须从应用程序的某些点恢复。
我已经创建了一个 "get" 方法,该方法使用该数据生成 Observable.just() ,但是,当从 websocket 接收到数据时,我需要另一个 Observable.just() 以便第一个订阅者observable 可以接收到更新后的数据,我该怎么做呢?我是否必须创建一个 Observable 对象并始终对该 Observable 执行 "just"?
我必须如何发送数据才能始终为同一个订阅者接收(与领域相同的方式)?
提前致谢
I need another Observable.just() when the data is received from the
websocket so the subscriber of the first observable can receive the
updated data, how should I do that?
主题是您可能正在寻找的内容。 Subject是一个对象,可以和Subscriber
和Observable
同时行动。如果您的 get() returns Subject#asObservable
而不是 Observable.just
,并且每次您从调用 Subject#onNext
的 websocket 获取新数据时,您将获得所需的行为。 Subject
本身是抽象的,但是RxJava
提供了一些具体的实现。请参考documentation了解它们之间的区别,选择更适合您的。
RxJava 有 Hot Observables,在你的情况下你需要使用 subject,将一个 observable 的发射传递给另一个。
检查这个单元测试
/**
* In this example we see how using hot observables ReplaySubject we can emit an item on broadcast to all the observers(subscribers).
*
* @throws InterruptedException
*/
@Test
public void testHotObservableUsingReplaySubject2() throws InterruptedException {
Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS);
Subject<Long, Long> publishSubject = ReplaySubject.create(1);
interval.subscribe(publishSubject);
Thread.sleep(1000L);
publishSubject.subscribe(System.out::println, (e) -> System.err.println(e.getMessage()), System.out::println);
}
您可以在此处查看更多 Hot Observables 示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/connectable/HotObservable.java
我有一个 android 应用程序,它从 websocket 连接接收数据。此数据存储在 Hashmap 上,必须从应用程序的某些点恢复。 我已经创建了一个 "get" 方法,该方法使用该数据生成 Observable.just() ,但是,当从 websocket 接收到数据时,我需要另一个 Observable.just() 以便第一个订阅者observable 可以接收到更新后的数据,我该怎么做呢?我是否必须创建一个 Observable 对象并始终对该 Observable 执行 "just"? 我必须如何发送数据才能始终为同一个订阅者接收(与领域相同的方式)?
提前致谢
I need another Observable.just() when the data is received from the websocket so the subscriber of the first observable can receive the updated data, how should I do that?
主题是您可能正在寻找的内容。 Subject是一个对象,可以和Subscriber
和Observable
同时行动。如果您的 get() returns Subject#asObservable
而不是 Observable.just
,并且每次您从调用 Subject#onNext
的 websocket 获取新数据时,您将获得所需的行为。 Subject
本身是抽象的,但是RxJava
提供了一些具体的实现。请参考documentation了解它们之间的区别,选择更适合您的。
RxJava 有 Hot Observables,在你的情况下你需要使用 subject,将一个 observable 的发射传递给另一个。
检查这个单元测试
/**
* In this example we see how using hot observables ReplaySubject we can emit an item on broadcast to all the observers(subscribers).
*
* @throws InterruptedException
*/
@Test
public void testHotObservableUsingReplaySubject2() throws InterruptedException {
Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS);
Subject<Long, Long> publishSubject = ReplaySubject.create(1);
interval.subscribe(publishSubject);
Thread.sleep(1000L);
publishSubject.subscribe(System.out::println, (e) -> System.err.println(e.getMessage()), System.out::println);
}
您可以在此处查看更多 Hot Observables 示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/connectable/HotObservable.java