即使在 onComplete() 之后也保持流活动
Keep stream alive even after onComplete()
我目前面临两个问题。
1) 一旦 RetrofitProvider.getInstance().getCurrentWeather(.....)
行被调用,网络调用就完成了。它怎么能延迟到观察者连接到它。
2) 调用 weatherInfoPublisher.onComplete()
后,下次我在此对象上调用 onComplete 时,不会调用新观察者的 onNext。
public Observable<LinkedList<WeatherInfo>> getWeatherData(final String payload, final TempUnit tempUnit) {
PublishSubject weatherInfoPublisher = PublishSubject.create();
RetrofitProvider.getInstance().getCurrentWeather(payload + ",us", translateTempUnit(tempUnit))
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String todayResponse) throws Exception {
Log.d(TAG, "Received today weather: " + todayResponse);
parseTodayData(todayResponse, weatherDataList);
return RetrofitProvider.getInstance().getForecastWeather(
payload + ",us", translateTempUnit(tempUnit), FORECAST_DAYS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(String futureResponse) {
Log.d(TAG, "Received future weather: " + futureResponse);
parseFutureData(futureResponse, weatherDataList);
weatherInfoPublisher.onNext(weatherDataList);
weatherInfoPublisher.onComplete();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "The error is, " + e.getMessage());
}
@Override
public void onComplete() {
}
});
return weatherInfoPublisher;
}
这是一个单例 class,整个实现已在 Github Link 中提供。
How can it be deferred till the observer is connected to it.
不要订阅此方法中的那个可观察对象。相反 return 客户端可以观察到。一旦订阅了 observable - 就会执行一个请求。
the next time I call onComplete
on this object the new observer's onNext
is not getting called.
参见 reactive stream specs:如果一个流完成 - 它永远无法继续,那就是一个终止事件。
我目前面临两个问题。
1) 一旦 RetrofitProvider.getInstance().getCurrentWeather(.....)
行被调用,网络调用就完成了。它怎么能延迟到观察者连接到它。
2) 调用 weatherInfoPublisher.onComplete()
后,下次我在此对象上调用 onComplete 时,不会调用新观察者的 onNext。
public Observable<LinkedList<WeatherInfo>> getWeatherData(final String payload, final TempUnit tempUnit) {
PublishSubject weatherInfoPublisher = PublishSubject.create();
RetrofitProvider.getInstance().getCurrentWeather(payload + ",us", translateTempUnit(tempUnit))
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String todayResponse) throws Exception {
Log.d(TAG, "Received today weather: " + todayResponse);
parseTodayData(todayResponse, weatherDataList);
return RetrofitProvider.getInstance().getForecastWeather(
payload + ",us", translateTempUnit(tempUnit), FORECAST_DAYS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(String futureResponse) {
Log.d(TAG, "Received future weather: " + futureResponse);
parseFutureData(futureResponse, weatherDataList);
weatherInfoPublisher.onNext(weatherDataList);
weatherInfoPublisher.onComplete();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "The error is, " + e.getMessage());
}
@Override
public void onComplete() {
}
});
return weatherInfoPublisher;
}
这是一个单例 class,整个实现已在 Github Link 中提供。
How can it be deferred till the observer is connected to it.
不要订阅此方法中的那个可观察对象。相反 return 客户端可以观察到。一旦订阅了 observable - 就会执行一个请求。
the next time I call
onComplete
on this object the new observer'sonNext
is not getting called.
参见 reactive stream specs:如果一个流完成 - 它永远无法继续,那就是一个终止事件。