我如何在 RxJava 中显式地发出 Flowable 完成的信号?
How can I explicitly signal completion of a Flowable in RxJava?
我正在尝试创建一个 Flowable
,它包装了一个 Iterable
。我定期将元素推送到我的 Iterable
,但似乎 completion 事件是隐含的。我不知道如何表示处理已完成。例如在我的代码中:
// note that this code is written in Kotlin
val iterable = LinkedBlockingQueue<Int>()
iterable.addAll(listOf(1, 2, 3))
val flowable = Flowable.fromIterable(iterable)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})
iterable.add(4)
Thread.sleep(1000)
iterable.add(5)
Thread.sleep(1000)
这会打印:
1
2
3
4
completed
我检查了 Flowable
接口的来源,但似乎我无法明确表示 Flowable
已完成。我该怎么做?在我的程序中,我发布的事件之间有一些延迟,我想明确何时 complete
事件流。
澄清:
我有一个很长的 运行 进程可以发出事件。我将它们收集在一个队列中,并公开了一个方法,该方法 returns 一个环绕我的队列的 Flowable。问题是当我创建 Flowable 时队列中可能已经有元素了。我只会处理一次事件,我知道事件流何时停止,所以我知道何时需要完成 Flowable。
使用 .fromIterable
是为您的用例创建 Flowable
的错误方法。
我实际上并不清楚那个用例是什么,但你可能想使用 Flowable.create()
或 PublishSubject
val flowable = Flowable.create<Int>( {
it.onNext(1)
it.onNext(2)
it.onComplete()
}, BackpressureStrategy.MISSING)
val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()
当然,您如何处理 back-pressure 将取决于数据源的性质。
正如 akarnokd 所建议的那样,ReplayProcessor 完全按照您的意愿行事。将 iterable.add(item)
替换为 processor.onNext(item)
,完成后调用 processor.onComplete()
。
我正在尝试创建一个 Flowable
,它包装了一个 Iterable
。我定期将元素推送到我的 Iterable
,但似乎 completion 事件是隐含的。我不知道如何表示处理已完成。例如在我的代码中:
// note that this code is written in Kotlin
val iterable = LinkedBlockingQueue<Int>()
iterable.addAll(listOf(1, 2, 3))
val flowable = Flowable.fromIterable(iterable)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})
iterable.add(4)
Thread.sleep(1000)
iterable.add(5)
Thread.sleep(1000)
这会打印:
1 2 3 4 completed
我检查了 Flowable
接口的来源,但似乎我无法明确表示 Flowable
已完成。我该怎么做?在我的程序中,我发布的事件之间有一些延迟,我想明确何时 complete
事件流。
澄清: 我有一个很长的 运行 进程可以发出事件。我将它们收集在一个队列中,并公开了一个方法,该方法 returns 一个环绕我的队列的 Flowable。问题是当我创建 Flowable 时队列中可能已经有元素了。我只会处理一次事件,我知道事件流何时停止,所以我知道何时需要完成 Flowable。
使用 .fromIterable
是为您的用例创建 Flowable
的错误方法。
我实际上并不清楚那个用例是什么,但你可能想使用 Flowable.create()
或 PublishSubject
val flowable = Flowable.create<Int>( {
it.onNext(1)
it.onNext(2)
it.onComplete()
}, BackpressureStrategy.MISSING)
val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()
当然,您如何处理 back-pressure 将取决于数据源的性质。
正如 akarnokd 所建议的那样,ReplayProcessor 完全按照您的意愿行事。将 iterable.add(item)
替换为 processor.onNext(item)
,完成后调用 processor.onComplete()
。