函数不在后台线程上执行

Function not executing on background thread

我正在尝试将 RX 应用于 Android。我想要在单击按钮时从 Web 下载内容并显示它。

我的问题是 HttpClient.connect() 在 mainThread 而不是后台线程上执行。 对 HttpClient.connect() 的调用作为传递给 Observable.map()

的函数执行
Observable<Integer> dayDeltas =  Obs.obsToSequence(Obs.Observable(textView)); //transforms click events to observable
Observable<String> dates = dayDeltas.map(...).map(...)
dates.map(Obs.dateToWebPage()) // calls http.connect()
     .map(Obs.parseEvents())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribeOn(Schedulers.newThread())
     .subscribe(updateTextView(textView));

public static Observable<Object> Observable(final TextView text) {
        return Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(final Subscriber<? super Object> subscriber) {
                final Object event = new Object();
                text.setOnClickListener(new View.OnClickListener() {
                    @Override
                    public void onClick(View v) {
                        Log.e("click", "click");
                        subscriber.onNext(event);
                    }
                });
            }
        });

现在我天真的解释是,因为我有 .subscribeOn(Schedulers.newThread()) observable 上的每个 function/operator 都应该在一个新线程中执行,包括 .map(f)。显然这不是正在发生的事情,那么这条链的哪一部分在新线程上执行?

subscribeOn 是用来触发订阅副作用的。在您的设置中,它将注册回调以捕获新线程上的按钮按下,但是当按下发生时,onNext 发射由主线程触发。然后,包括网络连接在内的链在主线程上执行。

您必须在连接方法之前放置一个新的 observeOn(Schedulers.io()) 以确保按钮按下事件的接收发生在主线程之外。

编辑:

//transforms click events to observable
Observable<Integer> dayDeltas =  Obs.obsToSequence(Obs.Observable(textView));

Observable<String> dates = dayDeltas.map(...).map(...)
dates
 .observeOn(Schedulers.io()) // <------------------------------------------ add
 .map(Obs.dateToWebPage()) // calls http.connect()
 .map(Obs.parseEvents())
 .observeOn(AndroidSchedulers.mainThread())
 //.subscribeOn(Schedulers.newThread()) // <------------------------------- remove
 .subscribe(updateTextView(textView));

仔细阅读Scheduling and Threading RX doc后,我有了解决方案:

 dates
       .observeOn(Schedulers.io())
       .subscribeOn(AndroidSchedulers.mainThread())
       .map(Obs.dateToWebPage())

在我的原始代码中,传递给 observeOn/subscribeOn 的调度程序被颠倒了并且在错误的位置。