如何在 RxJava 2 中的 groupBy 之后获取列表?

How to get a list after groupBy in RxJava 2?

我有一份交易清单。每笔交易都有货币和金额信息等。我想创建一个持有列表,所以当前持有的货币数量。我从 groupBy() 开始,然后继续减少。看来我必须先订阅才能对结果做任何事情,因为这会给我一个错误:

Observable.fromIterable(transactions)
            .groupBy(Transaction::getCurrency)
            .flatMap(t -> t.reduce(new Holding(t.getKey()), (holding, transaction) -> holding.addTransaction(transaction.getAmount()))

它说 "no instance of type variable R exist so that Single conforms to ObservableSource< ? extends R>"。

另一方面,如果我尝试这样做:

Observable.fromIterable(transactions)
            .groupBy(Transaction::getCurrency)
            .subscribe((GroupedObservable<String, Transaction> r) -> r.reduce(new Holding(r.getKey()), (holding, transaction) -> holding.addTransaction(transaction.getAmount()))
                    .toObservable()
                    .subscribe(t -> {
                                //t is a single Holding.
                            }
                    ));

我无法获取列表,因为我已经订阅了分组流。我可以把它加起来,但我很确定有一个更优雅的解决方案,但我想不出来。

解决方案 基于 akarnokd 的回答:

Observable.fromIterable(transactions)
            .groupBy(Transaction::getCurrency)
            .flatMapSingle(Observable::toList)
            .map(Holding::new)
            .toList()
            .subscribe(holdings -> {
                whatever(holdings);
            });

如文档所述,reduce 函数

applies a function to each item emitted by an Observable, sequentially, and emit the final value.

这是您获得单个值的方式(实际上,对于组中的每个 Observable,您都获得一个项目)。

您可以在获得列表后推迟 reduce 操作。你可以用这个替换你的第一个 long subscribe:

.subscribe(group -> group.toList()

然后你会根据你拥有的组数得到一些 Observables,每个发出一个你预定义类型的 List

注意:不确定,但您可以将第一个 subscribe 替换为 flatMap,将 GroupedObservable 转换为发出列表的 Observable项目数。

(从我对 post 的评论):

试试 flatMapSingle 大写。此外,从 onNext 处理程序中订阅是一种不好的做法,因为你失去了 RxJava 的组合属性。

这肯定有效

public Single<Map<Integer, List<Category>>> getSubCategoryListById(List<Category> categoryList) {

    return Flowable.just(categoryList)
        .flatMapIterable(new Function<List<Category>, Iterable<Category>>() {
          @Override public Iterable<Category> apply(List<Category> categories) throws Exception {
            return categories;
          }
        })
        .filter(new Predicate<Category>() {
          @Override public boolean test(Category category) throws Exception {
            return category.parent_id != 0;
          }
        })
        .groupBy(new Function<Category, Integer>() {
          @Override public Integer apply(Category category) throws Exception {
            return category.category_id;
          }
        })
        .flatMapSingle(new Function<GroupedFlowable<Integer, Category>, Single<List<Category>>>() {
          @Override public Single<List<Category>> apply(
              GroupedFlowable<Integer, Category> integerCategoryGroupedFlowable) throws Exception {
            return integerCategoryGroupedFlowable.toList();
          }
        })
        .toMap(new Function<List<Category>, Integer>() {
          @Override public Integer apply(List<Category> categories) throws Exception {
            return categories.get(0).category_id;
          }
        });
  }