获取 Observable 的最新值并立即发出
Get the latest value of an Observable and emit it immeditely
我正在尝试获取给定 Observable
的最新值并让它发出
一旦它被调用就立即。以下面的代码为例:
return Observable.just(myObservable.last())
.flatMap(myObservable1 -> {
return myObservable1;
})
.map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
这是行不通的,因为这样做 flatMap
会发出 myObservable1
,而 myObservable1
又会有
发射以到达 map
。
我不知道是否有可能做这样的事情。有没有人知道如何实现这个目标?谢谢
last()
方法在这里不会有任何帮助,因为它等待 Observable 终止给你最后一个发射的项目。
假设您无法控制发射的 observable,您可以简单地创建一个 BehaviorSubject
并将其订阅到发射您想要收听的数据的 observable,然后订阅创建的主题。由于 Subject
既是 Observable
又是 Subscriber
,你会得到你想要的。
我认为(现在没有时间检查)您可能必须手动取消订阅原始 Observable,因为 BehaviorSubject
一旦他的所有订阅者取消订阅将不会自动取消订阅。
像这样:
BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
// Here just after subscribing
// you will receive the last emitted item, if there was any.
// You can also always supply the first item to the behavior subject
});
http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html
在 RxJava 中,subscriber.onXXX 被称为 asynchronous.It 意味着如果你的 Observable 在新线程中发射项目,你永远无法获得 return 之前的最后一个项目,除非你阻塞线程并等待 item.But 如果 Observable 同步发出项目并且您不通过 subscribeOn 和 observOn 更改它的线程,
如代码:
Observable.just(1,2,3).subscribe();
在这种情况下,您可以通过以下方式获取最后一项:
Integer getLast(Observable<Integer> o){
final int[] ret = new int[1];
Observable.last().subscribe(i -> ret[0] = i);
return ret[0];
}
这样做是个坏主意 this.RxJava 更喜欢你用它来做异步工作。
你在这里真正想要实现的是将异步任务转换为同步任务。
有多种方法可以实现,每种方法各有利弊:
- 使用toBlocking() - it means that this thread will be BLOCKED, until the stream is finish, in order to get only one item simply use first(),因为它会在物品交付后完成。
假设您的整个信息流是
Observable<T> getData();
那么立即获取最后一个值的方法将如下所示:
public T getLastItem(){
return getData().toBlocking().first();
}
请不要使用 last(),因为它会等待流完成,然后才会发出最后一个项目。
如果您的流是一个网络请求并且它还没有得到任何项目,这将阻止您的线程!,所以只有在您确定有一个项目立即可用时才使用它(或者如果您真的想要一个阻止...)
另一种选择是简单地缓存最后的结果,像这样:
getData().subscribe(t-> cachedT = t;) //代码中的某处,它会继续保存最后交付的项目
public T getLastItem(){
return缓存T;
}
如果在您请求时没有发送任何项目,您将得到 null 或您设置的任何初始值。
这种方法的问题是订阅阶段可能发生在获取之后,如果在 2 个不同的线程中使用,可能会产生竞争条件。
我正在尝试获取给定 Observable
的最新值并让它发出
一旦它被调用就立即。以下面的代码为例:
return Observable.just(myObservable.last())
.flatMap(myObservable1 -> {
return myObservable1;
})
.map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
这是行不通的,因为这样做 flatMap
会发出 myObservable1
,而 myObservable1
又会有
发射以到达 map
。
我不知道是否有可能做这样的事情。有没有人知道如何实现这个目标?谢谢
last()
方法在这里不会有任何帮助,因为它等待 Observable 终止给你最后一个发射的项目。
假设您无法控制发射的 observable,您可以简单地创建一个 BehaviorSubject
并将其订阅到发射您想要收听的数据的 observable,然后订阅创建的主题。由于 Subject
既是 Observable
又是 Subscriber
,你会得到你想要的。
我认为(现在没有时间检查)您可能必须手动取消订阅原始 Observable,因为 BehaviorSubject
一旦他的所有订阅者取消订阅将不会自动取消订阅。
像这样:
BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
// Here just after subscribing
// you will receive the last emitted item, if there was any.
// You can also always supply the first item to the behavior subject
});
http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html
在 RxJava 中,subscriber.onXXX 被称为 asynchronous.It 意味着如果你的 Observable 在新线程中发射项目,你永远无法获得 return 之前的最后一个项目,除非你阻塞线程并等待 item.But 如果 Observable 同步发出项目并且您不通过 subscribeOn 和 observOn 更改它的线程, 如代码:
Observable.just(1,2,3).subscribe();
在这种情况下,您可以通过以下方式获取最后一项:
Integer getLast(Observable<Integer> o){
final int[] ret = new int[1];
Observable.last().subscribe(i -> ret[0] = i);
return ret[0];
}
这样做是个坏主意 this.RxJava 更喜欢你用它来做异步工作。
你在这里真正想要实现的是将异步任务转换为同步任务。
有多种方法可以实现,每种方法各有利弊:
- 使用toBlocking() - it means that this thread will be BLOCKED, until the stream is finish, in order to get only one item simply use first(),因为它会在物品交付后完成。
假设您的整个信息流是
Observable<T> getData();
那么立即获取最后一个值的方法将如下所示:
public T getLastItem(){
return getData().toBlocking().first();
}
请不要使用 last(),因为它会等待流完成,然后才会发出最后一个项目。
如果您的流是一个网络请求并且它还没有得到任何项目,这将阻止您的线程!,所以只有在您确定有一个项目立即可用时才使用它(或者如果您真的想要一个阻止...)
另一种选择是简单地缓存最后的结果,像这样:
getData().subscribe(t-> cachedT = t;) //代码中的某处,它会继续保存最后交付的项目 public T getLastItem(){ return缓存T; }
如果在您请求时没有发送任何项目,您将得到 null 或您设置的任何初始值。 这种方法的问题是订阅阶段可能发生在获取之后,如果在 2 个不同的线程中使用,可能会产生竞争条件。