传播可观察的,然后将其重新组合在一起

Spreading observable and then grouping it back together

问题

我需要创建这样的操作:

get all elements from DB -> send call to API that these elements are marked as read -> save status of read elements

我正在尝试用 RxJava 建立一个好的链,我想:

  1. 从数据库收集数据
  2. 创建了带有 List<Integer> 和 remoteIds
  3. 的 Observable
  4. 对这些元素调用 API
  5. 收集所有回复并将其作为 List<Integer>
  6. 传递
  7. 保存回复

问题是第 3 点和第 4 点的传播和分组。我不确定如何制作这样的链。正如您在下面看到的,我的代码停留在第 3 点,因为我不知道如何为每个请求收集数据。是否有某种类型的转换可能对我的情况有用?

实际进度又名代码

 sub = getDb()
            .createQuery(DbContract.Notification.TABLE_NAME, sql, String.valueOf(NOTIFICATION_UNREAD))
            .map(new CursorListMapper<>(new NotificationPersistenceModel()))
            .map(new Func1<List<DataNotification>, Observable<List<Integer>>>() {
                @Override
                public Observable<List<Integer>> call(List<DataNotification> notifications) {
                    List<Integer> ids = new ArrayList<>();
                    for (DataNotification notification : notifications) {
                        ids.add(notification.getRemoteId());
                    }

                    return Observable.just(ids);
                }
            })
            .subscribe();

编辑

不是映射到 Observable<List<Integer>>,而是映射到 List<Integer>,然后使用 Observable.from() 将列表转换为发射每个列表元素的 Observable。接下来,您只需将其平面映射到 Api 调用。这是一个例子

getDb()
        .createQuery(...)
        .map(...)
        .map(...) // map to List<Integer>
        .flatMap(new Func1<List<Integer>, Observable<List<Response>>>() {
            @Override
            public Observable<List<Response>> call(List<Integer> integers) {
                return Observable.from(list)
                        .flatMap(new Func1<Integer, Observable<Response>>() {
                           @Override
                           public Observable<Response> call(Integer integer) {
                                return api.call(integer);
                            }
                        })
                        .toList();
            }
        })

你可以继续mapping/flatMapping达到任何你想达到的目标