将 Single<User> 的 Flowable 转换为 List<User> 的 Flowable

Transform a Flowable of Single's<User> into a Flowable of List<User>

我有一个用于将 UserScoreTO 列表分组到单个用户对象(多对一关系)的 Rx 流。

public void execute() {
    getUsers()
            .flatMap(list -> Flowable.fromIterable(list))
            .groupBy(userScoreTO -> userScoreTO.id)
            .flatMap(groups -> Flowable.fromCallable(() -> groups.collect(User::new, (user, userscore) -> {
                user.id = userscore.id;
                user.name = userscore.name;
                user.totalScore += userscore.score;
            }))).subscribe(userSingle -> userSingle.subscribe(new SingleObserver<User>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onSuccess(User user) {
            System.out.println(user);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e);

        }
    }));

}

如您所见,为了消费这个可流动的东西,我订阅了它,消费了它发出的一些 Single's。所以我也订阅了那首单曲。虽然这甚至有效,但它有点烦人......我想只订阅一次并消费一组用户......

前几天又问了一个问题about this same code。 class 的完整代码在那里。

在末尾放置一个 flatMap 就不需要嵌套 subscribe

示例:

getUsers()
    .flatMap(list -> Flowable.fromIterable(list))
    .groupBy(userScoreTO -> userScoreTO.id)
    .flatMap(groups -> 
        Flowable.fromCallable(() -> 
            groups.collect(User::new, (user, userscore) -> {
                user.id = userscore.id;
                user.name = userscore.name;
                user.totalScore += userscore.score;
            }
    )))
    .flatMap(it -> it.toFlowable()) // <-- unwrap the singles
    .subscribe(user -> System.out.println(user));