RxJava 阻塞流直到其他观察者完成

RxJava block stream until other observer completes

我在后台有一个很长的 运行 进程,它正在处理文件并使用 subject.onNext(progress) 将其进度发布到 BehaviorSubject 并在完成后调用 subject.onCompleted

在流程的某个时刻,我想订阅那个 subject 并获取当前进度或等到它完成。

prepareOtherStuff()
    .flatMap(validate())
    .????? <- want to subscribe here
    .map(finalize())
    .subscribe()

我在 ????? 部分遇到问题。无法弄清楚如何阻止流并等待文件处理完成并获取文件处理进度以将其显示给用户。

例如:

-- other files already processed, don't care about them --
File 8 of 10 processed
File 9 of 10 processed
-- onCompleted received --

我怎样才能实现这种行为?

您可以使用 .concatWith(其中 TheType 是从 .flatMap(validate()) 返回的 Observable 的通用类型):

prepareOtherStuff()
  .flatMap(validate())
  .concatWith(subject
             .doOnNext(m -> log.info(m))
             .ignoreElements()
             .cast(TheType.class))
  .map(finalize())
  .subscribe()