RxAndroid Observable 跳过 doOnComplete()
RxAndroid Observable skips doOnComplete()
我正在同时调用多个 Retrofit api 并等待 Observable.zip()
正确完成它的完成。现在我想实施进步。
这是我的实现..
private void preFetchData() {
ApiInterface apiService1 = ApiClient.getWooRxClient().create(ApiInterface.class);
ApiInterface apiService2 = ApiClient.getRxClient().create(ApiInterface.class);
Map<String, String> map1 = new HashMap<>();
map1.put("on_sale", "true");
Observable<List<Product>> call1 = apiService1.getProducts1(map1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress1", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Map<String, String> map2 = new HashMap<>();
map2.put("featured", "true");
Observable<List<Product>> call2 = apiService1.getProducts1(map2)
.subscribeOn(Schedulers.io())
.delaySubscription(100, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress2", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Map<String, String> map3 = new HashMap<>();
map3.put("page", "1");
map3.put("sort", "rating");
map3.put("per_page", "10");
Observable<List<Product>> call3 = apiService2.getCustomProducts1(map3)
.subscribeOn(Schedulers.io())
.delaySubscription(200, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress3", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Map<String, String> map4 = new HashMap<>();
map4.put("page", "1");
map4.put("sort", "popularity");
map4.put("per_page", "10");
Observable<List<Product>> call4 = apiService2.getCustomProducts1(map4)
.subscribeOn(Schedulers.io())
.delaySubscription(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress4", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Observable<ResponseBody> call5 = apiService2.getCurrencySymbol()
.subscribeOn(Schedulers.io())
.delaySubscription(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 10;
Log.e("Progress5", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Observable<List<Category>> call6 = apiService1.getAllCategories()
.subscribeOn(Schedulers.io())
.delaySubscription(500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress6", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Observable<CombinedHomePage> combined = Observable.zip(call1, call2, call3, call4, call5, call6, CombinedHomePage::new);
disposable = combined.subscribe(this::successHomePage, this::throwableError);
}
private void successHomePage(CombinedHomePage o) {
Log.e("Response", "SUCCESS " + o.featuredProductList.size());
Log.e("Response", "SUCCESS " + o.saleProductList.size());
Log.e("Response", "SUCCESS " + o.topRatedProductList.size());
Log.e("Response", "SUCCESS " + o.topSellerProductList.size());
Log.e("Response", "SUCCESS " + o.CURRENCY);
Log.e("Response", "SUCCESS " + o.categoryList.size());
}
private void throwableError(Throwable t) {
Log.e("Response", "Fail");
}
这里是Logcat
First Run
E/Progress5: 10.0
E/Progress2: 28.0
E/Progress1: 46.0
E/Progress6: 64.0
E/Progress3: 82.0
E/Response: Featured List Size 5
E/Response: Sale List Size 7
E/Response: Rated List Size 10
E/Response: Seller List Size 10
E/Response: Currency $
E/Response: Category List Size 9
Second Run
E/Progress5: 10.0
E/Progress2: 28.0
E/Progress1: 46.0
E/Progress6: 64.0
E/Progress4: 82.0
E/Response: Featured List Size 5
E/Response: Sale List Size 7
E/Response: Rated List Size 10
E/Response: Seller List Size 10
E/Response: Currency $
E/Response: Category List Size 9
第一个 运行 过程 4 跳过,第二个 运行 过程 3 跳过(所有 Api 正确完成)。
知道为什么会这样吗?
如有任何帮助,我们将不胜感激
在javadoc中有描述:
The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while disposing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnComplete()). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will dispose B immediately. For example:
zip(Arrays.asList(range(1, 5).doOnComplete(action1),
range(6, 5).doOnComplete(action2)), (a) -> a)
action1 will be called but action2 won't.
To work around this termination property, use doOnDispose(Action) as well or use using() to do cleanup in case of completion or a dispose() call.
我正在同时调用多个 Retrofit api 并等待 Observable.zip()
正确完成它的完成。现在我想实施进步。
这是我的实现..
private void preFetchData() {
ApiInterface apiService1 = ApiClient.getWooRxClient().create(ApiInterface.class);
ApiInterface apiService2 = ApiClient.getRxClient().create(ApiInterface.class);
Map<String, String> map1 = new HashMap<>();
map1.put("on_sale", "true");
Observable<List<Product>> call1 = apiService1.getProducts1(map1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress1", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Map<String, String> map2 = new HashMap<>();
map2.put("featured", "true");
Observable<List<Product>> call2 = apiService1.getProducts1(map2)
.subscribeOn(Schedulers.io())
.delaySubscription(100, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress2", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Map<String, String> map3 = new HashMap<>();
map3.put("page", "1");
map3.put("sort", "rating");
map3.put("per_page", "10");
Observable<List<Product>> call3 = apiService2.getCustomProducts1(map3)
.subscribeOn(Schedulers.io())
.delaySubscription(200, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress3", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Map<String, String> map4 = new HashMap<>();
map4.put("page", "1");
map4.put("sort", "popularity");
map4.put("per_page", "10");
Observable<List<Product>> call4 = apiService2.getCustomProducts1(map4)
.subscribeOn(Schedulers.io())
.delaySubscription(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress4", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Observable<ResponseBody> call5 = apiService2.getCurrencySymbol()
.subscribeOn(Schedulers.io())
.delaySubscription(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 10;
Log.e("Progress5", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Observable<List<Category>> call6 = apiService1.getAllCategories()
.subscribeOn(Schedulers.io())
.delaySubscription(500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(() -> {
progress += 18;
Log.e("Progress6", progress + "");
mProgressBar.setProgress(progress);
mProgressBar.setProgressText(progress + "%");
});
Observable<CombinedHomePage> combined = Observable.zip(call1, call2, call3, call4, call5, call6, CombinedHomePage::new);
disposable = combined.subscribe(this::successHomePage, this::throwableError);
}
private void successHomePage(CombinedHomePage o) {
Log.e("Response", "SUCCESS " + o.featuredProductList.size());
Log.e("Response", "SUCCESS " + o.saleProductList.size());
Log.e("Response", "SUCCESS " + o.topRatedProductList.size());
Log.e("Response", "SUCCESS " + o.topSellerProductList.size());
Log.e("Response", "SUCCESS " + o.CURRENCY);
Log.e("Response", "SUCCESS " + o.categoryList.size());
}
private void throwableError(Throwable t) {
Log.e("Response", "Fail");
}
这里是Logcat
First Run
E/Progress5: 10.0
E/Progress2: 28.0
E/Progress1: 46.0
E/Progress6: 64.0
E/Progress3: 82.0
E/Response: Featured List Size 5
E/Response: Sale List Size 7
E/Response: Rated List Size 10
E/Response: Seller List Size 10
E/Response: Currency $
E/Response: Category List Size 9
Second Run
E/Progress5: 10.0
E/Progress2: 28.0
E/Progress1: 46.0
E/Progress6: 64.0
E/Progress4: 82.0
E/Response: Featured List Size 5
E/Response: Sale List Size 7
E/Response: Rated List Size 10
E/Response: Seller List Size 10
E/Response: Currency $
E/Response: Category List Size 9
第一个 运行 过程 4 跳过,第二个 运行 过程 3 跳过(所有 Api 正确完成)。
知道为什么会这样吗?
如有任何帮助,我们将不胜感激
在javadoc中有描述:
The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while disposing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnComplete()). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will dispose B immediately. For example:
zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
action1 will be called but action2 won't. To work around this termination property, use doOnDispose(Action) as well or use using() to do cleanup in case of completion or a dispose() call.