RxJava: return 一个缓存值 synchronously/immediately

RxJava: return a cached value synchronously/immediately

我正在寻找是否有办法 return 从 Observable 同步缓存值,否则可能需要很长时间才能发出。当然,如果它需要做它的io/computation,那么应该在计算线程上做,但如果它之前已经做过,那么应该是同步的,避免在线程之间来回跳转。下面是我的意思的一些示例代码:

public void bind(ItemViewHolder holder) {
    getCalculationObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(value -> {
                    holder.showValue(value); // This will happen after bind has finished
                }
            });
}

public Observable<Integer> getCalculationObservable() {
    if (mObservable == null) {

        mObservable = Observable.fromCallable(this::calculate)
                .subscribeOn(Schedulers.computation())
                .cache();

    }
    return mObservable;
}

public int calculate() throws InterruptedException {
    Thread.sleep(1000);
    return mValue * 1000;
}

编辑:为了说明我在说什么:

void onRunSchedulerExampleButtonClicked() throws InterruptedException {


    Observable<Integer> observable = Observable
            .fromCallable(this::calculate)
            .subscribeOn(Schedulers.computation())
            .cache();

    observable
            .doOnNext(value -> {
                Log.e("log", "first onNext()");
            })
            .test().await();

    observable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(value -> {
                Log.e("log", "second onNext()");
            })
            .test().await();

    Log.e("log", "this is first.");

}

结果:

E/log: onClick
E/log: first onNext()
E/log: this is first.
E/log: second onNext()

为了进一步说明这一点,如果您在第二个 onNext 链上添加一个 await(),您将永远不会完成,因为它将等待在您阻塞的同一线程中排队的事物。

更新:

当应用 observeOnAndroidSchedulers.mainThread() 调度程序时,下游事件通过在内部使用 postDelayed 发布到 MessageQueue。这就是为什么位于第二个 Observable 之后的代码在 Observable 完成的 之前执行 (或者如果我们使用 test().await() 则冻结)。一种可能的解决方案是使用 Subjects 作为数据源和订阅者之间的代理。查看这篇文章以获取更多信息 - Keep Your Main Thread Synchronous.

还有有用的文章:


解释为什么cache不切换线程:

您的 Observable 已经 returns 同步缓存了值,因为 cache 没有为每个订阅者订阅整个上游(因此在您的情况下它不会切换线程)。它执行一次,然后只记住项目的顺序。对于每个新订阅者,cache 只是重播它。


示例: (用 Kotlin 编写)

//here is the same logic as yours
private var observable: Observable<Int>? = null
    get() {
        if(field==null)
            field = Observable.fromCallable {
                System.out.println("callable: execution thread - ${Thread.currentThread().name}")
                Thread.sleep(1000)
                return@fromCallable 1000
            }
                    .subscribeOn(Schedulers.computation())
                    .doOnNext     { System.out.println("cached Observable: before cache() - doOnNext execution thread - ${Thread.currentThread().name}") }
                    .doOnComplete { System.out.println("cached Observable: before cache() - doOnComplete execution thread - ${Thread.currentThread().name}") }
                    .cache()
                    .doOnNext     { System.out.println("cached Observable: after cache() - doOnNext execution thread - ${Thread.currentThread().name}") }
                    .doOnComplete { System.out.println("cached Observable: after cache() - doOnComplete execution thread - ${Thread.currentThread().name}") }

        return field
    }

@Test
fun test() {
    observable!!
            .doOnSubscribe { System.out.println("first get: doOnSubscribe execution thread - ${Thread.currentThread().name}") }
            .doOnNext      { System.out.println("first get: doOnNext execution thread - ${Thread.currentThread().name}") }
            .doOnComplete  { System.out.println("first get: doOnComplete execution thread - ${Thread.currentThread().name}") }
            .test()
            .await()

    System.out.println("---------- first get executed ------------")

    observable!!
            .doOnSubscribe { System.out.println("second get: doOnSubscribe execution thread - ${Thread.currentThread().name}") }
            .doOnNext      { System.out.println("second get: doOnNext execution thread - ${Thread.currentThread().name}") }
            .doOnComplete  { System.out.println("second get: doOnComplete execution thread - ${Thread.currentThread().name}") }
            .subscribe()
}

输出:

first get: doOnSubscribe execution thread - main
callable: body execution thread - RxComputationThreadPool-1
cached Observable: before cache() - doOnNext execution thread - RxComputationThreadPool-1
cached Observable: after cache() - doOnNext execution thread - RxComputationThreadPool-1
first get: doOnNext execution thread - RxComputationThreadPool-1
cached Observable: before cache() - doOnComplete execution thread - RxComputationThreadPool-1
cached Observable: after cache() - doOnComplete execution thread - RxComputationThreadPool-1
first get: doOnComplete execution thread - RxComputationThreadPool-1
---------- first get executed ------------
second get: doOnSubscribe execution thread - main
cached Observable: after cache() - doOnNext execution thread - main
second get: doOnNext execution thread - main
cached Observable: after cache() - doOnComplete execution thread - main
second get: doOnComplete execution thread - main

如您所见,当有缓存值时,线程不会切换。

P.S。我假设你使用 RxJava2.