结合 RxTextView Observable 和 Retrofit Observable
Combine RxTextView Observable and Retrofit Observable
作为开始使用 RxAndroid 的示例,我正在尝试实现一个搜索框,当用户插入内容时触发 rest 调用。
到目前为止,我有两个工作部分。第一次观察EditTextView ...
RxTextView.textChangeEvents(searchEditText)
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted() {
Timber.d("onCompleted");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "onError");
}
@Override
public void onNext(TextViewTextChangeEvent e) {
Timber.d("onNext" + e.text().toString());
}
});
...第二部分使用 Retrofit 服务调用 REST API:
APIManager.getService().searchRestaurants("test")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Restaurant>>() {
@Override
public void onCompleted() {
Timber.d("onCompleted");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "onError");
}
@Override
public void onNext(List<Restaurant> restaurants) {
Timber.d("onNext");
for (Restaurant restaurant : restaurants) {
Timber.d(restaurant.getId() + ": " + restaurant.getName());
}
}
});
我的问题是将这两部分结合起来。我尝试使用 flatMap
运算符,如下所示:
RxTextView.textChangeEvents(searchEditText)
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<TextViewTextChangeEvent, Observable<List<Restaurant>>>() {
@Override
public Observable<List<Restaurant>> call(TextViewTextChangeEvent txtChangeEvt) {
return APIManager.getService().searchRestaurants(txtChangeEvt.text().toString());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Restaurant>>() {
@Override
public void onCompleted() {
Timber.d("onCompleted");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "onError");
}
@Override
public void onNext(List<Restaurant> restaurants) {
Timber.d("onNext");
for (Restaurant restaurant : restaurants) {
Timber.d(restaurant.getId() + ": " + restaurant.getName());
}
}
});
当我这样做时,出现以下异常:
java.lang.IllegalStateException: Must be called from the main thread. Was: Thread[RxCachedThreadScheduler-1,5,main]
at com.jakewharton.rxbinding.internal.Preconditions.checkUiThread(Preconditions.java:28)
at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:21)
at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:12)
所以我尝试通过调用 subscribeOn(AndroidSchedulers.mainThread()
来解决这个问题,但在这种情况下,当然,我得到了一个 NetworkOnMainThread 异常。
那我该怎么做呢?
组合应在不同线程上执行的不同 Observable 的正确方法是什么?
只需删除第一个 .observeOn(AndroidSchedulers.mainThread())
。看看这个example
Observable.just(1) // 1 will be emited in the IO thread pool
.subscribeOn(Schedulers.io())
.flatMap(...) // will be in the IO thread pool
.observeOn(Schedulers.computation())
.flatMap(...) // will be executed in the computation thread pool
.observeOn(AndroidSchedulers.mainThread())
.subscribe(); // will be executed in the Android main thread (if you're running your code on Android)
作为开始使用 RxAndroid 的示例,我正在尝试实现一个搜索框,当用户插入内容时触发 rest 调用。
到目前为止,我有两个工作部分。第一次观察EditTextView ...
RxTextView.textChangeEvents(searchEditText)
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted() {
Timber.d("onCompleted");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "onError");
}
@Override
public void onNext(TextViewTextChangeEvent e) {
Timber.d("onNext" + e.text().toString());
}
});
...第二部分使用 Retrofit 服务调用 REST API:
APIManager.getService().searchRestaurants("test")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Restaurant>>() {
@Override
public void onCompleted() {
Timber.d("onCompleted");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "onError");
}
@Override
public void onNext(List<Restaurant> restaurants) {
Timber.d("onNext");
for (Restaurant restaurant : restaurants) {
Timber.d(restaurant.getId() + ": " + restaurant.getName());
}
}
});
我的问题是将这两部分结合起来。我尝试使用 flatMap
运算符,如下所示:
RxTextView.textChangeEvents(searchEditText)
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<TextViewTextChangeEvent, Observable<List<Restaurant>>>() {
@Override
public Observable<List<Restaurant>> call(TextViewTextChangeEvent txtChangeEvt) {
return APIManager.getService().searchRestaurants(txtChangeEvt.text().toString());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Restaurant>>() {
@Override
public void onCompleted() {
Timber.d("onCompleted");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "onError");
}
@Override
public void onNext(List<Restaurant> restaurants) {
Timber.d("onNext");
for (Restaurant restaurant : restaurants) {
Timber.d(restaurant.getId() + ": " + restaurant.getName());
}
}
});
当我这样做时,出现以下异常:
java.lang.IllegalStateException: Must be called from the main thread. Was: Thread[RxCachedThreadScheduler-1,5,main]
at com.jakewharton.rxbinding.internal.Preconditions.checkUiThread(Preconditions.java:28)
at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:21)
at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:12)
所以我尝试通过调用 subscribeOn(AndroidSchedulers.mainThread()
来解决这个问题,但在这种情况下,当然,我得到了一个 NetworkOnMainThread 异常。
那我该怎么做呢? 组合应在不同线程上执行的不同 Observable 的正确方法是什么?
只需删除第一个 .observeOn(AndroidSchedulers.mainThread())
。看看这个example
Observable.just(1) // 1 will be emited in the IO thread pool
.subscribeOn(Schedulers.io())
.flatMap(...) // will be in the IO thread pool
.observeOn(Schedulers.computation())
.flatMap(...) // will be executed in the computation thread pool
.observeOn(AndroidSchedulers.mainThread())
.subscribe(); // will be executed in the Android main thread (if you're running your code on Android)