RxAnadroid 连接流
RxAnadroid concatenating streams
我正在制作一个简单的天气应用程序来学习 RxAndroid,但我遇到了以下问题。
我首先加载我感兴趣的城市,然后询问每个城市的天气。
getCitiesUseCase returns 我从数据库加载的 Observable<List<City>>
。我将该城市列表发送到我的视图以显示它们,然后在订阅者内部单独询问天气(平面图)。
Subscription subscription = getCitiesUseCase.execute().flatMap(new Func1<List<City>, Observable<City>>() {
@Override
public Observable<City> call(List<City> cities) {
citiesView.addCities(cities);
return Observable.from(cities);
}
}).subscribe(new Subscriber<City>() {
@Override
public void onCompleted() {
subscriptions.remove(this);
this.unsubscribe();
}
@Override
public void onError(Throwable e) {
Log.e(this.getClass().getSimpleName(), e.toString());
}
@Override
public void onNext(City city) {
getCityWeatherUseCase.setLatLon(city.getLat().toString(), city.getLon().toString(), city.getId());
getCityWeather(city);
}
});
subscriptions.add(subscription);
现在 getCityWeather() 方法如下所示:
private void getCityWeather(final City city) {
subscriptions.add(getCityWeatherUseCase.execute().subscribe(new Subscriber<CityWeather>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e("error", e.toString());
}
@Override
public void onNext(CityWeather cityWeather) {
city.setCityWeather(cityWeather);
citiesView.updateCity(city);
}
}));
}
一切正常,如预期的那样,但我在订阅者内部订阅观察者的事实感觉不对。我知道 rxJava 可以让你玩弄订阅者来防止这种事情发生,但我真的不知道如何进一步改进我的代码。请记住,我需要一个城市才能询问它的天气。
圣诞快乐!
一种方法如下。 (我正在使用 retrolambda - 所以无论你看到 ->
,只需替换为新的匿名内部 class)。
请注意,我正在使用 flatMap
来启动天气数据请求,而不是像您的问题所暗示的那样 Observable.concat
。这样做的原因是您的调度程序(例如 io()
)将并行处理这些并在结果可用时发送结果。然而,对于 Observable.concat
,这些请求将被序列化,因此它们将被迫一次发生一个 - 抵消了像 io()
.
这样的线程池的好处
private class City {
public String name;
public City(String name) {
this.name = name;
}
public void setWeather(Weather weather) { /*...*/ }
}
private class Weather {
public String status;
public Weather(String status) {
this.status = status;
}
}
private Observable<Weather> getWeather(City city) {
// call your weather API here..
return Observable.just(new Weather("Sunny"));
}
@Test
public void test() {
Observable<List<City>> citiesObs = Observable.create(new Observable.OnSubscribe<List<City>>() {
@Override
public void call(Subscriber<? super List<City>> subscriber) {
// do work
final List<City> cities = new ArrayList<>();
cities.add(new City("Paris"));
cities.add(new City("Tokyo"));
cities.add(new City("Oslo"));
// send results
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(cities);
subscriber.onCompleted();
}
}
});
Observable<City> obs = citiesObs
// inject a side effect
.doOnNext(list -> {
// pass `list` to your view here
})
// turn Observable<Iterable<T>> into Observable<T>
.flatMapIterable(list -> list)
// Map a city to an observable that fetches Weather data
// Your scheduler can take care of these at once.
.flatMap(city -> {
return getWeather(city)
// another side effect
.doOnNext(weather -> {
city.setWeather(weather);
})
// map baack to city, just for the heck of it
.map($ -> city);
});
TestSubscriber sub = TestSubscriber.create();
obs.subscribe(sub);
sub.awaitTerminalEvent();
sub.assertValueCount(3);
}
另请注意,为了利用 io()
,您需要添加对 subscribeOn(Schedulers.io())
的调用以告知可观察对象开始在 io
线程上工作水池。当您想将控制传递给另一个线程时,例如您的视图,您可以在副作用(或映射)之前插入一个 observeOn(AndroidSchedulers.mainThread())
。如果您想将天气调用的控制权返回到后台线程,则可以在 flatMap
到 getWeather(City)
.[=24= 之前添加另一个对 observeOn(Schedulers.io())
的调用]
我正在制作一个简单的天气应用程序来学习 RxAndroid,但我遇到了以下问题。
我首先加载我感兴趣的城市,然后询问每个城市的天气。
getCitiesUseCase returns 我从数据库加载的 Observable<List<City>>
。我将该城市列表发送到我的视图以显示它们,然后在订阅者内部单独询问天气(平面图)。
Subscription subscription = getCitiesUseCase.execute().flatMap(new Func1<List<City>, Observable<City>>() {
@Override
public Observable<City> call(List<City> cities) {
citiesView.addCities(cities);
return Observable.from(cities);
}
}).subscribe(new Subscriber<City>() {
@Override
public void onCompleted() {
subscriptions.remove(this);
this.unsubscribe();
}
@Override
public void onError(Throwable e) {
Log.e(this.getClass().getSimpleName(), e.toString());
}
@Override
public void onNext(City city) {
getCityWeatherUseCase.setLatLon(city.getLat().toString(), city.getLon().toString(), city.getId());
getCityWeather(city);
}
});
subscriptions.add(subscription);
现在 getCityWeather() 方法如下所示:
private void getCityWeather(final City city) {
subscriptions.add(getCityWeatherUseCase.execute().subscribe(new Subscriber<CityWeather>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e("error", e.toString());
}
@Override
public void onNext(CityWeather cityWeather) {
city.setCityWeather(cityWeather);
citiesView.updateCity(city);
}
}));
}
一切正常,如预期的那样,但我在订阅者内部订阅观察者的事实感觉不对。我知道 rxJava 可以让你玩弄订阅者来防止这种事情发生,但我真的不知道如何进一步改进我的代码。请记住,我需要一个城市才能询问它的天气。 圣诞快乐!
一种方法如下。 (我正在使用 retrolambda - 所以无论你看到 ->
,只需替换为新的匿名内部 class)。
请注意,我正在使用 flatMap
来启动天气数据请求,而不是像您的问题所暗示的那样 Observable.concat
。这样做的原因是您的调度程序(例如 io()
)将并行处理这些并在结果可用时发送结果。然而,对于 Observable.concat
,这些请求将被序列化,因此它们将被迫一次发生一个 - 抵消了像 io()
.
private class City {
public String name;
public City(String name) {
this.name = name;
}
public void setWeather(Weather weather) { /*...*/ }
}
private class Weather {
public String status;
public Weather(String status) {
this.status = status;
}
}
private Observable<Weather> getWeather(City city) {
// call your weather API here..
return Observable.just(new Weather("Sunny"));
}
@Test
public void test() {
Observable<List<City>> citiesObs = Observable.create(new Observable.OnSubscribe<List<City>>() {
@Override
public void call(Subscriber<? super List<City>> subscriber) {
// do work
final List<City> cities = new ArrayList<>();
cities.add(new City("Paris"));
cities.add(new City("Tokyo"));
cities.add(new City("Oslo"));
// send results
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(cities);
subscriber.onCompleted();
}
}
});
Observable<City> obs = citiesObs
// inject a side effect
.doOnNext(list -> {
// pass `list` to your view here
})
// turn Observable<Iterable<T>> into Observable<T>
.flatMapIterable(list -> list)
// Map a city to an observable that fetches Weather data
// Your scheduler can take care of these at once.
.flatMap(city -> {
return getWeather(city)
// another side effect
.doOnNext(weather -> {
city.setWeather(weather);
})
// map baack to city, just for the heck of it
.map($ -> city);
});
TestSubscriber sub = TestSubscriber.create();
obs.subscribe(sub);
sub.awaitTerminalEvent();
sub.assertValueCount(3);
}
另请注意,为了利用 io()
,您需要添加对 subscribeOn(Schedulers.io())
的调用以告知可观察对象开始在 io
线程上工作水池。当您想将控制传递给另一个线程时,例如您的视图,您可以在副作用(或映射)之前插入一个 observeOn(AndroidSchedulers.mainThread())
。如果您想将天气调用的控制权返回到后台线程,则可以在 flatMap
到 getWeather(City)
.[=24= 之前添加另一个对 observeOn(Schedulers.io())
的调用]