RxAnadroid 连接流

RxAnadroid concatenating streams

我正在制作一个简单的天气应用程序来学习 RxAndroid,但我遇到了以下问题。 我首先加载我感兴趣的城市,然后询问每个城市的天气。 getCitiesUseCase returns 我从数据库加载的 Observable<List<City>>。我将该城市列表发送到我的视图以显示它们,然后在订阅者内部单独询问天气(平面图)。

Subscription subscription = getCitiesUseCase.execute().flatMap(new Func1<List<City>, Observable<City>>() {
        @Override
        public Observable<City> call(List<City> cities) {
            citiesView.addCities(cities);
            return Observable.from(cities);
        }
    }).subscribe(new Subscriber<City>() {
        @Override
        public void onCompleted() {
            subscriptions.remove(this);
            this.unsubscribe();
        }

        @Override
        public void onError(Throwable e) {
            Log.e(this.getClass().getSimpleName(), e.toString());
        }

        @Override
        public void onNext(City city) {
            getCityWeatherUseCase.setLatLon(city.getLat().toString(), city.getLon().toString(), city.getId());
            getCityWeather(city);
        }
    });

    subscriptions.add(subscription);

现在 getCityWeather() 方法如下所示:

    private void getCityWeather(final City city) {
    subscriptions.add(getCityWeatherUseCase.execute().subscribe(new Subscriber<CityWeather>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
            Log.e("error", e.toString());
        }

        @Override
        public void onNext(CityWeather cityWeather) {
            city.setCityWeather(cityWeather);
            citiesView.updateCity(city);
        }
    }));
}

一切正常,如预期的那样,但我在订阅者内部订阅观察者的事实感觉不对。我知道 rxJava 可以让你玩弄订阅者来防止这种事情发生,但我真的不知道如何进一步改进我的代码。请记住,我需要一个城市才能询问它的天气。 圣诞快乐!

一种方法如下。 (我正在使用 retrolambda - 所以无论你看到 ->,只需替换为新的匿名内部 class)。

请注意,我正在使用 flatMap 来启动天气数据请求,而不是像您的问题所暗示的那样 Observable.concat 。这样做的原因是您的调度程序(例如 io())将并行处理这些并在结果可用时发送结果。然而,对于 Observable.concat,这些请求将被序列化,因此它们将被迫一次发生一个 - 抵消了像 io().

这样的线程池的好处
private class City {
    public String name;
    public City(String name) {
        this.name = name;
    }
    public void setWeather(Weather weather) { /*...*/ }
}

private class Weather {
    public String status;
    public Weather(String status) {
        this.status = status;
    }
}

private Observable<Weather> getWeather(City city) {
    // call your weather API here..
    return Observable.just(new Weather("Sunny"));
}

@Test
public void test() {
    Observable<List<City>> citiesObs = Observable.create(new Observable.OnSubscribe<List<City>>() {
        @Override
        public void call(Subscriber<? super List<City>> subscriber) {
            // do work
            final List<City> cities = new ArrayList<>();
            cities.add(new City("Paris"));
            cities.add(new City("Tokyo"));
            cities.add(new City("Oslo"));

            // send results
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext(cities);
                subscriber.onCompleted();
            }
        }
    });

    Observable<City> obs = citiesObs

            // inject a side effect
            .doOnNext(list -> {
                // pass `list` to your view here
            })

            // turn Observable<Iterable<T>> into Observable<T>
            .flatMapIterable(list -> list)

            // Map a city to an observable that fetches Weather data
            // Your scheduler can take care of these at once.
            .flatMap(city -> {
                return getWeather(city)

                        // another side effect
                        .doOnNext(weather -> {
                            city.setWeather(weather);
                        })

                        // map baack to city, just for the heck of it
                        .map($ -> city);

            });

    TestSubscriber sub = TestSubscriber.create();
    obs.subscribe(sub);
    sub.awaitTerminalEvent();
    sub.assertValueCount(3);
}

另请注意,为了利用 io(),您需要添加对 subscribeOn(Schedulers.io()) 的调用以告知可观察对象开始在 io 线程上工作水池。当您想将控制传递给另一个线程时,例如您的视图,您可以在副作用(或映射)之前插入一个 observeOn(AndroidSchedulers.mainThread()) 。如果您想将天气调用的控制权返回到后台线程,则可以在 flatMapgetWeather(City).[=24= 之前添加另一个对 observeOn(Schedulers.io()) 的调用]