RxJava2,repeatUntil 和 whileDo

RxJava2, repeatUntil and whileDo

我不确定我是否理解 repeatUntil。我想要这样的东西:

  1. 仅从
  2. 创建可观察对象
  3. 检查条件,如果为真运行所有下游,否则就完成
  4. 重复第二点

这是我的测试 Observable,它 运行 进入 IndexOutOfBoundsException

    List<Integer> values = new ArrayList<Integer>();

    Observable<Integer> observable = Observable.just(values)
            .repeatUntil(() -> values.isEmpty())
            .map(integers -> values.remove(0));
    observable.subscribe(integer -> System.out.println(integer), 
          throwable -> System.out.println(throwable));

在我看来,repeatUntil 会在最后第一次执行。

我错了吗?如果我是对的,如何在开始时检查条件?

我也看到了,rxJava1中有whileDo操作符。它现在在哪里?

UPD

我可以通过在 repeatUntil 之前使用 takeWhile 以某种方式解决问题,但也许有更好的解决方案?

    List<Integer> values = new ArrayList<Integer>();
    /*
    values.add(0);
    values.add(1);
    values.add(2);
    values.add(3);
    values.add(4);
    */

    Observable<Integer> observable = Observable.just(values)
            .takeWhile(integers -> values.size() > 0)
            .map(integers -> values.remove(0))
            .repeatUntil(() -> values.isEmpty());
    observable.subscribe(integer -> System.out.println("Test " + integer), 
             throwable -> System.out.println(throwable), 
             () -> System.out.println("Completed"));

RepeatUntil 在其上游完成时调用该函数,这是在它发出空 ArrayList 信号之后,因此您会得到异常,因为 remove 会发现空 ArrayList

whileDo 操作员住在 RxJava 2 Extensions 项目中。

List<Integer> values = new ArrayList<Integer>();

StatementFlowable.whileDo(
    Observable.just(values),
    () -> values.isEmpty()
)
.map(integers -> values.remove(0))
.subscribe(integer -> System.out.println(integer), 
      throwable -> System.out.println(throwable))
;