Schedulers.io() 没有返回到主线程

Schedulers.io() not returning to main thread

我正在使用 RxParse 来解析查询的异步加载,但是当我使用 subscribeOn(Schedulers.io()) 订阅我的可观察对象时,我的 onCompleted 方法从未在主线程上被调用。取而代之的是,我的 onCompleted 方法在工作线程池内部被调用。如果我使用 observeOn(AndroidSchedulers.mainThread) 一切都会正常工作,但我的 onNextMethod 也会在主线程上调用,我不想要它。

我的代码有问题吗?

我的代码有什么问题吗?

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                // ... worker thread (but here is ok)
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);

首先你要明白subscribeOn()observeOn()的区别。这是两个完全不同的运算符,它们影响 Rx 链的不同部分。

subscribeOn() 指定 Observable 将在哪里工作。它不会影响 onNext()onError()onComplete() 的执行位置。

observeOn() 指定执行回调(例如 onNext())的位置。它不会影响您的 Observable 在哪里工作。

所有回调将发生在同一个线程上。您不能通过任何 RxJava API 指定某些回调发生在一个线程上,而某些回调发生在另一个线程上。如果那是您想要的行为,您将必须在回调中自己实现它。

不幸的是,所有方法的订阅都在同一个线程中(onNextonErroronCompleted

但是您可以在 Schedulers.io()onNext(T t) 方法中观察,创建一个新的 Observable 以在 MainThread 中监听,如下所示:

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(Schedulers.io())
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                Observable.just(t)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe((t) -> {
                         // do something in MainThread
                    })
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);

希望对您有所帮助!

在这种情况下,我建议使用 "side action" 运算符。在我看来,这是一个比使用嵌套可观察对象稍微优雅的解决方案:

ParseObservable.find(myQuery)
        .map(myMapFunc())
        .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
        .doOnCompleted(() -> onCompleteAction())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(value -> onNext(value))
        .subscribe();

不建议在订阅中订阅。

subscribeOn 决定当观察者订阅时 Observable 链开始的位置。

observeOn 可以在整个可观察链的不同点使用(如果需要,可以多次使用)以 在线程之间传递控制权 。 (您可以通过检查您是否在每个块中的主线程上来验证这一点)。

ParseObservable.find(myQuery)
    .map(myMapFunc())

    // Added this:
    .doOnNext(obj -> {
        // NOTE: This will happen on your `subscribeOn` scheduler 
        // Do something with `obj` here while on worker thread
    }

    .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))

    // Added this:
    .observeOn(AndroidSchedulers.mainThread())    

    .subscribe(new Subscriber<>() {
        next -> {
            // NOTE: This will happen on the main thread
        },
        error -> {
            Log.e("error","error",e);
            // NOTE: This will happen on the main thread
        },
        () -> {
            // NOTE: This will happen on the main thread
        }
    });