ObserveOn 不起作用

ObserveOn doesn't work

我开始为 Android 学习 Rx,但我遇到了一个错误。这是我的代码:

在我的 Subscriber 中,我打印以记录当前线程名称:

Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() {
        // ... onCompleted, onError
        @Override
        public void onNext(Integer s) {
            Log.e("RX", "threadName " + getCurrThreadName());
        }
};      

这里我想运行在后台编码并从mainThread中的Observable获取数据:

 ArrayList list = new ArrayList();
 list.add(...) // creating data list

 Observable.from(list)
     .map(TransformFunc.getTransformer())
     .subscribeOn(AndroidSchedulers.mainThread())
     .observeOn(Schedulers.from(executor))
     .subscribe(integerSubscriber);

...

其他代码: 静态字符串 getCurrThreadName() { return Thread.currentThread().getName(); }

static class TransformFunc implements Func1<String, Integer> {

    private static TransformFunc instance;

    static TransformFunc getTransformer() {
       ... // return instance

        @Override
        public Integer call(@NonNull String s) {
            ...
                TimeUnit.SECONDS.sleep(4);
                Log.e("RX", "threadName " + getCurrThreadName());
            ...

            return s.length();
        }
}

但是当我在设备上 运行 这段代码时,我看到白屏 5-10 秒,并且在日志中是这样的:

12-02 16:34:39.374 26086-26086/com.shevart.fitnessnotes E/RX: threadName main
12-02 16:34:39.378 26086-26180/com.shevart.fitnessnotes E/RX: threadName pool-1-thread-2
12-02 16:34:43.375 26086-26086/com.shevart.fitnessnotes E/RX: threadName main
12-02 16:34:43.379 26086-26216/com.shevart.fitnessnotes E/RX: threadName pool-1-thread-3
12-02 16:34:47.376 26086-26086/com.shevart.fitnessnotes E/RX: threadName main
12-02 16:34:47.396 26086-26253/com.shevart.fitnessnotes E/RX: threadName pool-1-thread-4
12-02 16:34:51.376 26086-26086/com.shevart.fitnessnotes E/RX: threadName main
12-02 16:34:51.398 26086-26292/com.shevart.fitnessnotes E/RX: threadName pool-1-thread-5

为什么 onNext()map 中的 call() 之前调用?

您想在主线程上观察,而不是相反。

 .subscribeOn(Schedulers.from(executor))
 .observeOn(AndroidSchedulers.mainThread())

通过调用 .subscribeOn(),您基本上是在告诉 Observable 在特定的 Scheduler 上完成它的工作。通过 .observeOn() 另一方面,您告诉 Observable 向您发送该线程上的所有通知。

所以你需要交换你的订阅和观察调度器。

.subscribeOn(Schedulers.from(executor))
.observeOn(AndroidSchedulers.mainThread())