为什么这总是在主线程上运行?

Why this always runs on main thread?

我试图实现一个非常简单的例子RxAndroid2。当我尝试 运行 这段代码时,它让我感到困惑。

    ArrayList<Integer> arr = new ArrayList<>();
    arr.add(0);
    arr.add(1);
    arr.add(2);
    arr.add(3);
    arr.add(4);
    arr.add(5);
    arr.add(6);

    Observable.fromIterable(arr)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    LogUtil.logD(TAG, Thread.currentThread().getName());
                }

                @Override
                public void onNext(Integer integer) {
                    LogUtil.logD(TAG, Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                    LogUtil.logD(TAG, Thread.currentThread().getName());
                }

                @Override
                public void onComplete() {
                    LogUtil.logD(TAG, Thread.currentThread().getName());
                }
            });

并且日志总是告诉我它 运行 仅在 main 线程上。 对此有什么想法吗?如果您能向我解释更多有关 observeOn() 和 subscribeOn() 的信息,那就太好了。

已编辑:这段代码仍然给我所有主线程。

ArrayList<Integer> arr = new ArrayList<>();
    arr.add(0);
    arr.add(1);
    arr.add(2);
    arr.add(3);
    arr.add(4);
    arr.add(5);
    arr.add(6);

    Observable.fromIterable(arr)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    LogUtil.logD(TAG, Thread.currentThread().getName());
                }
            })
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    LogUtil.logD(TAG, Thread.currentThread().getName());
                }
            })
            .subscribe(integer -> {
                LogUtil.logD(TAG, Thread.currentThread().getName());
            });

你有observeOn(AndroidSchedulers.mainThread())

observeOn()javadoc 明确指出:

Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer with Flowable.bufferSize() "island size".

那么为什么您对主线程上发生的事情感到惊讶?!

并且要清楚这一点:您的代码将 Observable 配置为使用 主线程。

进一步研究相关的 API,我 猜测 您必须在您的调度程序上调用 start(),例如:

Scheduler newThreadScheduler = Schedulers.newThread();
newThreadScheduler.start();
Observable.fromIterable(arr)
        .subscribeOn(newThreadScheduler)
        .observeOn(newThreadScheduler)

您可以移动运算符:

Observable.fromIterable(arr)
        .subscribeOn(Schedulers.newThread())
                                                      // <------------------------
        .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                LogUtil.logD(TAG, Thread.currentThread().getName());
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtil.logD(TAG, Thread.currentThread().getName());
            }
        })
        .observeOn(AndroidSchedulers.mainThread())  // <--------------------------
        .subscribe(integer -> {
            LogUtil.logD(TAG, Thread.currentThread().getName());
        });

这不再在主线程上执行 doOnSubscibedoOnNext 中的代码,只有 subscribe 处理程序在主线程上执行。