RxJava: return 一个缓存值 synchronously/immediately
RxJava: return a cached value synchronously/immediately
我正在寻找是否有办法 return 从 Observable 同步缓存值,否则可能需要很长时间才能发出。当然,如果它需要做它的io/computation,那么应该在计算线程上做,但如果它之前已经做过,那么应该是同步的,避免在线程之间来回跳转。下面是我的意思的一些示例代码:
public void bind(ItemViewHolder holder) {
getCalculationObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(value -> {
holder.showValue(value); // This will happen after bind has finished
}
});
}
public Observable<Integer> getCalculationObservable() {
if (mObservable == null) {
mObservable = Observable.fromCallable(this::calculate)
.subscribeOn(Schedulers.computation())
.cache();
}
return mObservable;
}
public int calculate() throws InterruptedException {
Thread.sleep(1000);
return mValue * 1000;
}
编辑:为了说明我在说什么:
void onRunSchedulerExampleButtonClicked() throws InterruptedException {
Observable<Integer> observable = Observable
.fromCallable(this::calculate)
.subscribeOn(Schedulers.computation())
.cache();
observable
.doOnNext(value -> {
Log.e("log", "first onNext()");
})
.test().await();
observable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(value -> {
Log.e("log", "second onNext()");
})
.test().await();
Log.e("log", "this is first.");
}
结果:
E/log: onClick
E/log: first onNext()
E/log: this is first.
E/log: second onNext()
为了进一步说明这一点,如果您在第二个 onNext 链上添加一个 await(),您将永远不会完成,因为它将等待在您阻塞的同一线程中排队的事物。
更新:
当应用 observeOn
和 AndroidSchedulers.mainThread()
调度程序时,下游事件通过在内部使用 postDelayed
发布到 MessageQueue
。这就是为什么位于第二个 Observable 之后的代码在 Observable 完成的 之前执行 (或者如果我们使用 test().await()
则冻结)。一种可能的解决方案是使用 Subjects
作为数据源和订阅者之间的代理。查看这篇文章以获取更多信息 - Keep Your Main Thread Synchronous.
还有有用的文章:
解释为什么cache
不切换线程:
您的 Observable
已经 returns 同步缓存了值,因为 cache
没有为每个订阅者订阅整个上游(因此在您的情况下它不会切换线程)。它执行一次,然后只记住项目的顺序。对于每个新订阅者,cache
只是重播它。
示例:
(用 Kotlin 编写)
//here is the same logic as yours
private var observable: Observable<Int>? = null
get() {
if(field==null)
field = Observable.fromCallable {
System.out.println("callable: execution thread - ${Thread.currentThread().name}")
Thread.sleep(1000)
return@fromCallable 1000
}
.subscribeOn(Schedulers.computation())
.doOnNext { System.out.println("cached Observable: before cache() - doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("cached Observable: before cache() - doOnComplete execution thread - ${Thread.currentThread().name}") }
.cache()
.doOnNext { System.out.println("cached Observable: after cache() - doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("cached Observable: after cache() - doOnComplete execution thread - ${Thread.currentThread().name}") }
return field
}
@Test
fun test() {
observable!!
.doOnSubscribe { System.out.println("first get: doOnSubscribe execution thread - ${Thread.currentThread().name}") }
.doOnNext { System.out.println("first get: doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("first get: doOnComplete execution thread - ${Thread.currentThread().name}") }
.test()
.await()
System.out.println("---------- first get executed ------------")
observable!!
.doOnSubscribe { System.out.println("second get: doOnSubscribe execution thread - ${Thread.currentThread().name}") }
.doOnNext { System.out.println("second get: doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("second get: doOnComplete execution thread - ${Thread.currentThread().name}") }
.subscribe()
}
输出:
first get: doOnSubscribe execution thread - main
callable: body execution thread - RxComputationThreadPool-1
cached Observable: before cache() - doOnNext execution thread - RxComputationThreadPool-1
cached Observable: after cache() - doOnNext execution thread - RxComputationThreadPool-1
first get: doOnNext execution thread - RxComputationThreadPool-1
cached Observable: before cache() - doOnComplete execution thread - RxComputationThreadPool-1
cached Observable: after cache() - doOnComplete execution thread - RxComputationThreadPool-1
first get: doOnComplete execution thread - RxComputationThreadPool-1
---------- first get executed ------------
second get: doOnSubscribe execution thread - main
cached Observable: after cache() - doOnNext execution thread - main
second get: doOnNext execution thread - main
cached Observable: after cache() - doOnComplete execution thread - main
second get: doOnComplete execution thread - main
如您所见,当有缓存值时,线程不会切换。
P.S。我假设你使用 RxJava2.
我正在寻找是否有办法 return 从 Observable 同步缓存值,否则可能需要很长时间才能发出。当然,如果它需要做它的io/computation,那么应该在计算线程上做,但如果它之前已经做过,那么应该是同步的,避免在线程之间来回跳转。下面是我的意思的一些示例代码:
public void bind(ItemViewHolder holder) {
getCalculationObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(value -> {
holder.showValue(value); // This will happen after bind has finished
}
});
}
public Observable<Integer> getCalculationObservable() {
if (mObservable == null) {
mObservable = Observable.fromCallable(this::calculate)
.subscribeOn(Schedulers.computation())
.cache();
}
return mObservable;
}
public int calculate() throws InterruptedException {
Thread.sleep(1000);
return mValue * 1000;
}
编辑:为了说明我在说什么:
void onRunSchedulerExampleButtonClicked() throws InterruptedException {
Observable<Integer> observable = Observable
.fromCallable(this::calculate)
.subscribeOn(Schedulers.computation())
.cache();
observable
.doOnNext(value -> {
Log.e("log", "first onNext()");
})
.test().await();
observable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(value -> {
Log.e("log", "second onNext()");
})
.test().await();
Log.e("log", "this is first.");
}
结果:
E/log: onClick
E/log: first onNext()
E/log: this is first.
E/log: second onNext()
为了进一步说明这一点,如果您在第二个 onNext 链上添加一个 await(),您将永远不会完成,因为它将等待在您阻塞的同一线程中排队的事物。
更新:
当应用 observeOn
和 AndroidSchedulers.mainThread()
调度程序时,下游事件通过在内部使用 postDelayed
发布到 MessageQueue
。这就是为什么位于第二个 Observable 之后的代码在 Observable 完成的 之前执行 (或者如果我们使用 test().await()
则冻结)。一种可能的解决方案是使用 Subjects
作为数据源和订阅者之间的代理。查看这篇文章以获取更多信息 - Keep Your Main Thread Synchronous.
还有有用的文章:
解释为什么cache
不切换线程:
您的 Observable
已经 returns 同步缓存了值,因为 cache
没有为每个订阅者订阅整个上游(因此在您的情况下它不会切换线程)。它执行一次,然后只记住项目的顺序。对于每个新订阅者,cache
只是重播它。
示例: (用 Kotlin 编写)
//here is the same logic as yours
private var observable: Observable<Int>? = null
get() {
if(field==null)
field = Observable.fromCallable {
System.out.println("callable: execution thread - ${Thread.currentThread().name}")
Thread.sleep(1000)
return@fromCallable 1000
}
.subscribeOn(Schedulers.computation())
.doOnNext { System.out.println("cached Observable: before cache() - doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("cached Observable: before cache() - doOnComplete execution thread - ${Thread.currentThread().name}") }
.cache()
.doOnNext { System.out.println("cached Observable: after cache() - doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("cached Observable: after cache() - doOnComplete execution thread - ${Thread.currentThread().name}") }
return field
}
@Test
fun test() {
observable!!
.doOnSubscribe { System.out.println("first get: doOnSubscribe execution thread - ${Thread.currentThread().name}") }
.doOnNext { System.out.println("first get: doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("first get: doOnComplete execution thread - ${Thread.currentThread().name}") }
.test()
.await()
System.out.println("---------- first get executed ------------")
observable!!
.doOnSubscribe { System.out.println("second get: doOnSubscribe execution thread - ${Thread.currentThread().name}") }
.doOnNext { System.out.println("second get: doOnNext execution thread - ${Thread.currentThread().name}") }
.doOnComplete { System.out.println("second get: doOnComplete execution thread - ${Thread.currentThread().name}") }
.subscribe()
}
输出:
first get: doOnSubscribe execution thread - main
callable: body execution thread - RxComputationThreadPool-1
cached Observable: before cache() - doOnNext execution thread - RxComputationThreadPool-1
cached Observable: after cache() - doOnNext execution thread - RxComputationThreadPool-1
first get: doOnNext execution thread - RxComputationThreadPool-1
cached Observable: before cache() - doOnComplete execution thread - RxComputationThreadPool-1
cached Observable: after cache() - doOnComplete execution thread - RxComputationThreadPool-1
first get: doOnComplete execution thread - RxComputationThreadPool-1
---------- first get executed ------------
second get: doOnSubscribe execution thread - main
cached Observable: after cache() - doOnNext execution thread - main
second get: doOnNext execution thread - main
cached Observable: after cache() - doOnComplete execution thread - main
second get: doOnComplete execution thread - main
如您所见,当有缓存值时,线程不会切换。
P.S。我假设你使用 RxJava2.