Rxjava2 - 如何让观察者在第一个或使用 zipWith 后停止发射项目?
Rxjava2 - how to get observer to stop emitting items after first one or use zipWith?
下面的代码可以很好地进行网络调用。但它不断发出相同的结果。我只想获取第一个结果并停止发射。没有命令我可以说只是发出第一个。我试过 take(1) 但某些原因改变了结果大小。
//class variables
FeedsModel feedsModelResult;
HashMap<Integer, ProductModel> productMap;
//method
@Override
protected Observable buildUseCaseObservable() {
/* gets feedModel then parses through each feed for product IDs. then does a network call to get each product. stores retrieved
product model in hashmap for quick retrieval. returns a pair.
*/
return feedRepository.fetchFeeds(shopId, langId, skip)
.concatMap(new Function<FeedsModel, ObservableSource<List<Feed>>>() {
@Override
public ObservableSource<List<Feed>> apply(@NonNull final FeedsModel feedsModel) throws Exception {
feedsModelResult = feedsModel;
return Observable.fromCallable(new Callable<List<Feed>>() {
@Override
public List<Feed> call() throws Exception {
return feedsModel.getFeed();
}
});
}
})
.concatMap(new Function<List<Feed>, ObservableSource<Feed>>() {
@Override
public ObservableSource<Feed> apply(@NonNull List<Feed> feeds) throws Exception {
return Observable.fromIterable(feeds);
}
}).filter(new Predicate<Feed>() {
@Override
public boolean test(@NonNull Feed feed) throws Exception {
return feed.getProducts() != null;
}
})
.concatMap(new Function<Feed, ObservableSource<Double>>() {
@Override
public ObservableSource<Double> apply(@NonNull Feed feed) throws Exception {
return Observable.fromIterable((ArrayList<Double>) feed.getProducts());
}
})
.concatMap(new Function<Double, ObservableSource<ProductModel>>() {
@Override
public ObservableSource<ProductModel> apply(@NonNull Double productId) throws Exception {
return productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency);
}
}).concatMap(new Function<ProductModel, ObservableSource<Map<Integer, ProductModel>>>() {
@Override
public ObservableSource apply(@NonNull ProductModel productModel) throws Exception {
productMap.put(productModel.getIdProduct(), productModel);
return Observable.fromCallable(new Callable<Map<Integer, ProductModel>>() {
@Override
public Map<Integer, ProductModel> call() throws Exception {
return productMap;
}
});
}
}).concatMap(new Function<Map<Integer, ProductModel>, ObservableSource<Pair>>() {
@Override
public ObservableSource apply(@NonNull final Map<Integer, ProductModel> productModelMap) throws Exception {
return Observable.fromCallable(new Callable() {
@Override
public Object call() throws Exception {
return Pair.create(feedsModelResult, productMap);
}
});
}
});
}
更新:
在 onSubscribe 中,我保留对一次性对象的引用,并在获得第一个结果后在 onNext() 中处理它。这是一种有效的方法吗?
在调用的最后结果 Pair.create(feedsModelResult, productMap);我想我应该使用 zipWith 运算符来等待所有结果完成,但我不确定如何
理解你的流程并不容易,但你似乎在尝试查询一些 FeedsModel
对象,并发出单个 Pair
值来打包这个 FeedsModel
对象及其包含的一些收集的内部产品对象地图。
问题是您的 Observable
被压平了每个内部列表的可能次数,并尝试在到达地图的过程中收集它们,而流仍然将这些项目发送给最终订阅者。
您需要的是一个获取输入 FeedsModel
的流,将所有项目收集到地图中并 仅发出单个 项目,即生成的地图,然后您可以将此地图与输入 FeedsModel
以及您刚刚收集的结果地图配对。
假设您的 feedRepository.fetchFeeds
只能 return 单个 FeedsModel
项(您可以使用 Single<FeedsModel>
对其进行优化),您将在最后的流中得到单个结果。
建议使用包含结果选择器的 flatMap 变体:
feedRepository.fetchFeeds(shopId, langId, skip)
.flatMap(feedsModel ->
getProductsMapFromFeedsModelObservable(feedsModel, shopId, langId, currency)
,(feedsModel, productsMap) ->
Pair.create(feedsModel, productsMap)
);
并且 getProductsMapFromFeedsModelObservable()
是一个 Observable
,它从输入 FeedsModel 收集产品到产品的映射:
private Observable<HashMap<Integer, ProductModel>> getProductsMapFromFeedsModelObservable(
FeedsModel feedsModel, int shopId, int langId, int currency) {
return Observable.fromIterable(feedsModel.getFeed())
.filter(feed -> feed.getProducts() != null)
.flatMapIterable(feed -> feed.getProducts())
.flatMap(productId -> productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency))
.reduce(new HashMap<Integer, ProductModel>(),
(productsMap, productModel) -> {
productsMap.put(productModel.getIdProduct(), productModel);
return productsMap;
})
.toObservable();
}
reduce()
用于收集所有项目以映射和发出单个项目,flatMap()
用于此处,因为您可以从并行执行中获益,无论如何可能不需要 concat()
,不确定为什么它排在第一位(保留顺序?)。
下面的代码可以很好地进行网络调用。但它不断发出相同的结果。我只想获取第一个结果并停止发射。没有命令我可以说只是发出第一个。我试过 take(1) 但某些原因改变了结果大小。
//class variables
FeedsModel feedsModelResult;
HashMap<Integer, ProductModel> productMap;
//method
@Override
protected Observable buildUseCaseObservable() {
/* gets feedModel then parses through each feed for product IDs. then does a network call to get each product. stores retrieved
product model in hashmap for quick retrieval. returns a pair.
*/
return feedRepository.fetchFeeds(shopId, langId, skip)
.concatMap(new Function<FeedsModel, ObservableSource<List<Feed>>>() {
@Override
public ObservableSource<List<Feed>> apply(@NonNull final FeedsModel feedsModel) throws Exception {
feedsModelResult = feedsModel;
return Observable.fromCallable(new Callable<List<Feed>>() {
@Override
public List<Feed> call() throws Exception {
return feedsModel.getFeed();
}
});
}
})
.concatMap(new Function<List<Feed>, ObservableSource<Feed>>() {
@Override
public ObservableSource<Feed> apply(@NonNull List<Feed> feeds) throws Exception {
return Observable.fromIterable(feeds);
}
}).filter(new Predicate<Feed>() {
@Override
public boolean test(@NonNull Feed feed) throws Exception {
return feed.getProducts() != null;
}
})
.concatMap(new Function<Feed, ObservableSource<Double>>() {
@Override
public ObservableSource<Double> apply(@NonNull Feed feed) throws Exception {
return Observable.fromIterable((ArrayList<Double>) feed.getProducts());
}
})
.concatMap(new Function<Double, ObservableSource<ProductModel>>() {
@Override
public ObservableSource<ProductModel> apply(@NonNull Double productId) throws Exception {
return productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency);
}
}).concatMap(new Function<ProductModel, ObservableSource<Map<Integer, ProductModel>>>() {
@Override
public ObservableSource apply(@NonNull ProductModel productModel) throws Exception {
productMap.put(productModel.getIdProduct(), productModel);
return Observable.fromCallable(new Callable<Map<Integer, ProductModel>>() {
@Override
public Map<Integer, ProductModel> call() throws Exception {
return productMap;
}
});
}
}).concatMap(new Function<Map<Integer, ProductModel>, ObservableSource<Pair>>() {
@Override
public ObservableSource apply(@NonNull final Map<Integer, ProductModel> productModelMap) throws Exception {
return Observable.fromCallable(new Callable() {
@Override
public Object call() throws Exception {
return Pair.create(feedsModelResult, productMap);
}
});
}
});
}
更新: 在 onSubscribe 中,我保留对一次性对象的引用,并在获得第一个结果后在 onNext() 中处理它。这是一种有效的方法吗?
在调用的最后结果 Pair.create(feedsModelResult, productMap);我想我应该使用 zipWith 运算符来等待所有结果完成,但我不确定如何
理解你的流程并不容易,但你似乎在尝试查询一些 FeedsModel
对象,并发出单个 Pair
值来打包这个 FeedsModel
对象及其包含的一些收集的内部产品对象地图。
问题是您的 Observable
被压平了每个内部列表的可能次数,并尝试在到达地图的过程中收集它们,而流仍然将这些项目发送给最终订阅者。
您需要的是一个获取输入 FeedsModel
的流,将所有项目收集到地图中并 仅发出单个 项目,即生成的地图,然后您可以将此地图与输入 FeedsModel
以及您刚刚收集的结果地图配对。
假设您的 feedRepository.fetchFeeds
只能 return 单个 FeedsModel
项(您可以使用 Single<FeedsModel>
对其进行优化),您将在最后的流中得到单个结果。
建议使用包含结果选择器的 flatMap 变体:
feedRepository.fetchFeeds(shopId, langId, skip)
.flatMap(feedsModel ->
getProductsMapFromFeedsModelObservable(feedsModel, shopId, langId, currency)
,(feedsModel, productsMap) ->
Pair.create(feedsModel, productsMap)
);
并且 getProductsMapFromFeedsModelObservable()
是一个 Observable
,它从输入 FeedsModel 收集产品到产品的映射:
private Observable<HashMap<Integer, ProductModel>> getProductsMapFromFeedsModelObservable(
FeedsModel feedsModel, int shopId, int langId, int currency) {
return Observable.fromIterable(feedsModel.getFeed())
.filter(feed -> feed.getProducts() != null)
.flatMapIterable(feed -> feed.getProducts())
.flatMap(productId -> productsRepository.fetchProduct(productId.intValue(), shopId, langId, currency))
.reduce(new HashMap<Integer, ProductModel>(),
(productsMap, productModel) -> {
productsMap.put(productModel.getIdProduct(), productModel);
return productsMap;
})
.toObservable();
}
reduce()
用于收集所有项目以映射和发出单个项目,flatMap()
用于此处,因为您可以从并行执行中获益,无论如何可能不需要 concat()
,不确定为什么它排在第一位(保留顺序?)。