RxJava 2 force complete chain with infinite observable

RxJava 2 force complete chain with infinite observable

我有一个可观察链,顶部有无限可观察对象,后面有非无限可观察对象。像这样:

repo.infinitGetItems()
      .switchMap(items -> Observable
                          .just(items)
                          .flatMap(items -> repo.nonInfinitObs(items)));

我想要的是在 repo.nonInfinitObs 发送 onComplete 事件时完成所有链。现在还没有完成,因为 repo.infinitGetItems() 还活着。

我可以在 rxJava2 中对整个链执行强制完成之类的操作吗?

您可以通过 takeUntil 和一些流外部方式停止主序列:

PublishSubject<Integer> stop = PublishSubject.create();

repo.infinitGetItems()
  .takeUntil(stop)
  .switchMap(items -> repo.nonInfinitObs(items)
                      .doOnComplete(() -> stop.onComplete())
  );