Rxjava2 - 如何让观察者在第一个或使用 zipWith 后停止发射项目?

Rxjava2 - how to get observer to stop emitting items after first one or use zipWith?

下面的代码可以很好地进行网络调用。但它不断发出相同的结果。我只想获取第一个结果并停止发射。没有命令我可以说只是发出第一个。我试过 take(1) 但某些原因改变了结果大小。

//class variables
  FeedsModel feedsModelResult;
    HashMap<Integer, ProductModel> productMap;

//method

    @Override
       protected Observable buildUseCaseObservable() {
           /* gets feedModel then parses through each feed for product IDs. then does a network call to get each product. stores retrieved
           product model in hashmap for quick retrieval. returns a pair.
            */
           return feedRepository.fetchFeeds(shopId, langId, skip)
                   .concatMap(new Function<FeedsModel, ObservableSource<List<Feed>>>() {
                       @Override
                       public ObservableSource<List<Feed>> apply(@NonNull final FeedsModel feedsModel) throws Exception {
                           feedsModelResult = feedsModel;
                           return Observable.fromCallable(new Callable<List<Feed>>() {
                               @Override
                               public List<Feed> call() throws Exception {

                                   return feedsModel.getFeed();
                               }
                           });
                       }
                   })
                   .concatMap(new Function<List<Feed>, ObservableSource<Feed>>() {
                       @Override
                       public ObservableSource<Feed> apply(@NonNull List<Feed> feeds) throws Exception {

                           return Observable.fromIterable(feeds);
                       }
                   }).filter(new Predicate<Feed>() {
                       @Override
                       public boolean test(@NonNull Feed feed) throws Exception {
                           return feed.getProducts() != null;
                       }
                   })
                   .concatMap(new Function<Feed, ObservableSource<Double>>() {
                       @Override
                       public ObservableSource<Double> apply(@NonNull Feed feed) throws Exception {
                           return Observable.fromIterable((ArrayList<Double>) feed.getProducts());
                       }
                   })
                   .concatMap(new Function<Double, ObservableSource<ProductModel>>() {
                       @Override
                       public ObservableSource<ProductModel> apply(@NonNull Double productId) throws Exception {
                           return productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency);
                       }
                   }).concatMap(new Function<ProductModel, ObservableSource<Map<Integer, ProductModel>>>() {
                       @Override
                       public ObservableSource apply(@NonNull ProductModel productModel) throws Exception {

                           productMap.put(productModel.getIdProduct(), productModel);
                           return Observable.fromCallable(new Callable<Map<Integer, ProductModel>>() {
                               @Override
                               public Map<Integer, ProductModel> call() throws Exception {
                                   return productMap;
                               }
                           });
                       }
                   }).concatMap(new Function<Map<Integer, ProductModel>, ObservableSource<Pair>>() {
                       @Override
                       public ObservableSource apply(@NonNull final Map<Integer, ProductModel> productModelMap) throws Exception {
                           return Observable.fromCallable(new Callable() {
                               @Override
                               public Object call() throws Exception {
                                   return Pair.create(feedsModelResult, productMap);
                               }
                           });
                       }
                   });
       }

更新: 在 onSubscribe 中,我保留对一次性对象的引用,并在获得第一个结果后在 onNext() 中处理它。这是一种有效的方法吗?

在调用的最后结果 Pair.create(feedsModelResult, productMap);我想我应该使用 zipWith 运算符来等待所有结果完成,但我不确定如何

理解你的流程并不容易,但你似乎在尝试查询一些 FeedsModel 对象,并发出单个 Pair 值来打包这个 FeedsModel 对象及其包含的一些收集的内部产品对象地图。

问题是您的 Observable 被压平了每个内部列表的可能次数,并尝试在到达地图的过程中收集它们,而流仍然将这些项目发送给最终订阅者。

您需要的是一个获取输入 FeedsModel 的流,将所有项目收集到地图中并 仅发出单个 项目,即生成的地图,然后您可以将此地图与输入 FeedsModel 以及您刚刚收集的结果地图配对。
假设您的 feedRepository.fetchFeeds 只能 return 单个 FeedsModel 项(您可以使用 Single<FeedsModel> 对其进行优化),您将在最后的流中得到单个结果。

建议使用包含结果选择器的 flatMap 变体:

feedRepository.fetchFeeds(shopId, langId, skip)
 .flatMap(feedsModel ->
         getProductsMapFromFeedsModelObservable(feedsModel, shopId, langId, currency)
         ,(feedsModel, productsMap) ->
               Pair.create(feedsModel, productsMap)
         );

并且 getProductsMapFromFeedsModelObservable() 是一个 Observable,它从输入 FeedsModel 收集产品到产品的映射:

private Observable<HashMap<Integer, ProductModel>> getProductsMapFromFeedsModelObservable(
            FeedsModel feedsModel, int shopId, int langId, int currency) {
        return Observable.fromIterable(feedsModel.getFeed())
                .filter(feed -> feed.getProducts() != null)
                .flatMapIterable(feed -> feed.getProducts())
                .flatMap(productId -> productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency))
                .reduce(new HashMap<Integer, ProductModel>(),
                        (productsMap, productModel) -> {
                            productsMap.put(productModel.getIdProduct(), productModel);
                            return productsMap;
                        })
                .toObservable();
    }

reduce() 用于收集所有项目以映射和发出单个项目,flatMap() 用于此处,因为您可以从并行执行中获益,无论如何可能不需要 concat(),不确定为什么它排在第一位(保留顺序?)。