如何 concatWith 使用来自先前 Observable 的信息进行分页

How to concatWith using information from previous Observable for pagination

假设我有一个名为 List<UUID> listOf(int page) 的阻塞方法。 如果我想像这样分页,一个想法是做这样的事情:

public Observable<UUID> allOf(int initialPage) {
    return fromCallable( () -> listOf(initialPage))
            .concatWith( fromCallable( () -> allOf(initialPage + 1)))
            .flatMap(x -> from(x));
}

如果我的服务不使用页码而是使用列表的最后一个元素来查找下一个元素,我如何使用 RxJava 实现它?

我仍然想获得像 allOf(0).take(20) 那样的效果,并通过 concatWith 在第一个 Observable 完成时获得对第二个 Observable 的调用。

但是当我需要上次通话的信息​​时,我该怎么做呢?

您可以使用主题将下一个页码发送回序列的开头:

List<Integer> service(int index) {
    System.out.println("Reading " + index);
    List<Integer> list = new ArrayList<>();
    for (int i = index; i < index + 20; i++) {
        list.add(i);
    }
    return list;
}

Flowable<List<Integer>> getPage(int index) {
    FlowableProcessor<Integer> pager = UnicastProcessor.<Integer>create()
        .toSerialized();
    pager.onNext(index);

    return pager.observeOn(Schedulers.trampoline(), true, 1)
    .map(v -> {
        List<Integer> list = service(v);
        pager.onNext(list.get(list.size() - 1) + 1);
        return list;
    })
    ;
}

@Test
public void testPager() {
    getPage(0).take(20)
    .subscribe(System.out::println, Throwable::printStackTrace);
}