Retrofit 和 RxJava:结合 Flowable 和 Observable 得到结果

Retrofit and RxJava: Combine Flowable with Observable to get result

我想结合一个 Flowable 和一个 Observable 来从这些来源创建一个模型。

我从 Room 得到 CartModel 项目,该模型包含产品 ID 和数量。

  @Override
        public Flowable<List<CartModel>> getCartItems() {
            return appLocalDataStore.getCartItems();
        }

之后我有一个函数,上面函数的每个 id 组成一对数量和产品详细信息。

@Override
public Observable<List<Pair<WsProductDetails, Integer>>> getCartWithProductData() {
    return appLocalDataStore.getCartItems().toObservable().flatMap(new Function<List<CartModel>, ObservableSource<CartModel>>() {
        @Override
        public ObservableSource<CartModel> apply(List<CartModel> cartModels) throws Exception {
            return Observable.fromIterable(cartModels);
        }
    }).flatMap(new Function<CartModel, ObservableSource<Pair<WsProductDetails, Integer>>>() {
        @Override
        public ObservableSource<Pair<WsProductDetails, Integer>> apply(final CartModel cartModel) throws Exception {
            return appRemoteDataStore.getProductBySKU(cartModel.getId()).map(new Function<WsProductDetails, Pair<WsProductDetails, Integer>>() {
                @Override
                public Pair<WsProductDetails, Integer> apply(WsProductDetails wsProductDetails) throws Exception {
                    Log.d("TestCartItems", "SKU" + wsProductDetails.getSku() + "  quantity" + cartModel.getQuantity());
                    return new Pair<>(wsProductDetails, cartModel.getQuantity());
                }
            });
        }
    }).toList().toObservable();
}   

 @GET(PATH_PRODUCT_DETAILS_BY_SKY)
    Observable<WsProductDetails> getProductBySKU(@Path(PATH_CODE) String cod);

以上功能正常,因为日志是正确的。

但在 My Presenter 中永远不会调用 OnNext()||OnError()||onComplete() 函数。 为什么会这样?

@Override
    public void getCartItems() {
        Observable<List<Pair<WsProductDetails, Integer>>> source = appDataStore.getCartWithProductData();
        mCompositeDisposable.add(source
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribeWith(new DisposableObserver<List<Pair<WsProductDetails, Integer>>>() {
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete()");
                    }

                    @Override
                    public void onNext(List<Pair<WsProductDetails, Integer>> pairs) {
                        Log.d(TAG, "onError()");
                        view.showCartItems(pairs);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError()");
                    }
                }));

    }
}

您必须将查找 flatMap 应用于内部 fromIterable 以便它变得有限:

@Override
public Observable<List<Pair<WsProductDetails, Integer>>> getCartWithProductData() {
    return appLocalDataStore.getCartItems().toObservable().flatMap(new Function<List<CartModel>, ObservableSource<CartModel>>() {
        @Override
        public ObservableSource<CartModel> apply(List<CartModel> cartModels) throws Exception {
            return Observable.fromIterable(cartModels)
                .flatMap(new Function<CartModel, ObservableSource<Pair<WsProductDetails, Integer>>>() {
                    @Override
                    public ObservableSource<Pair<WsProductDetails, Integer>> apply(final CartModel cartModel) throws Exception {
                        return appRemoteDataStore.getProductBySKU(cartModel.getId())
                            .map(new Function<WsProductDetails, Pair<WsProductDetails, Integer>>() {
                             @Override
                             public Pair<WsProductDetails, Integer> apply(WsProductDetails wsProductDetails) throws Exception {
                                 Log.d("TestCartItems", "SKU" + wsProductDetails.getSku() + "  quantity" + cartModel.getQuantity());
                                 return new Pair<>(wsProductDetails, cartModel.getQuantity());
                             }
                         });
                     }
                }).toList().toObservable();
        }
    });
}