RxJava 阻塞流直到其他观察者完成
RxJava block stream until other observer completes
我在后台有一个很长的 运行 进程,它正在处理文件并使用 subject.onNext(progress)
将其进度发布到 BehaviorSubject
并在完成后调用 subject.onCompleted
。
在流程的某个时刻,我想订阅那个 subject
并获取当前进度或等到它完成。
prepareOtherStuff()
.flatMap(validate())
.????? <- want to subscribe here
.map(finalize())
.subscribe()
我在 ?????
部分遇到问题。无法弄清楚如何阻止流并等待文件处理完成并获取文件处理进度以将其显示给用户。
例如:
-- other files already processed, don't care about them --
File 8 of 10 processed
File 9 of 10 processed
-- onCompleted received --
我怎样才能实现这种行为?
您可以使用 .concatWith
(其中 TheType
是从 .flatMap(validate())
返回的 Observable 的通用类型):
prepareOtherStuff()
.flatMap(validate())
.concatWith(subject
.doOnNext(m -> log.info(m))
.ignoreElements()
.cast(TheType.class))
.map(finalize())
.subscribe()
我在后台有一个很长的 运行 进程,它正在处理文件并使用 subject.onNext(progress)
将其进度发布到 BehaviorSubject
并在完成后调用 subject.onCompleted
。
在流程的某个时刻,我想订阅那个 subject
并获取当前进度或等到它完成。
prepareOtherStuff()
.flatMap(validate())
.????? <- want to subscribe here
.map(finalize())
.subscribe()
我在 ?????
部分遇到问题。无法弄清楚如何阻止流并等待文件处理完成并获取文件处理进度以将其显示给用户。
例如:
-- other files already processed, don't care about them --
File 8 of 10 processed
File 9 of 10 processed
-- onCompleted received --
我怎样才能实现这种行为?
您可以使用 .concatWith
(其中 TheType
是从 .flatMap(validate())
返回的 Observable 的通用类型):
prepareOtherStuff()
.flatMap(validate())
.concatWith(subject
.doOnNext(m -> log.info(m))
.ignoreElements()
.cast(TheType.class))
.map(finalize())
.subscribe()