RxJava2,显式完成一个冷源(Flowable)
RxJava2, completing a cold source (Flowable) explicitly
先搜索了一下,没有找到好的答案,所以就这样:)
尝试将 RxJava2 应用到我们在工作中使用的现有管道 - 热源和冷源。虽然停止一个热门案例(用 Flowable.create 包装)很简单,但另一种情况似乎更棘手。
简而言之,我希望能够使源完成 和 在被处置之前耗尽管道中已经存在的项目。
这就是我想出的(takeUntil() 和一个特殊的项目来结束序列),但是关于是否有更好的方法的问题仍然存在
(Flowables.repeatCallable 是 https://github.com/akarnokd/RxJava2Extensions 的扩展,比常用方法更适合我的情况)
buffer/map/reduce 只是为了创建与 某些 处理的相似之处 :)
final AtomicInteger counter = new AtomicInteger();
AtomicBoolean stop = new AtomicBoolean(false);
Disposable d = Flowables.repeatCallable(() -> {
return stop.get() ? -1 : counter.getAndIncrement();
}
)
.doOnCancel(() -> {
debug("cancelled");
})
.doOnRequest(l ->
debug(Long.toString(l) + " values requested")
)
.takeUntil(val -> val < 0)
.filter(val -> val >= 0)
.buffer(2000, TimeUnit.MILLISECONDS, scheduler, 15, ArrayListSupplier.asCallable(), true)
.map(list -> {
debug("received " + list.size() + " items: " + list);
return list.size();
})
.reduce(0, (sum, item) -> sum + item)
.subscribeOn(scheduler)
.subscribe(i -> debug("Items received: " + i));
然后,在某个时候:
stop.set(true);
while (!d.isDisposed()) {
// wait
}
这在收到停止信号后完成,最终的 onSuccess 处理程序将执行。
奇怪的是,onCancel 处理程序被调用了两次:
首先来自
onNext:69, FlowableTakeUntilPredicate$InnerSubscriber (io.reactivex.internal.operators.flowable)
然后从
cancel:100, FlowableTakeUntilPredicate$InnerSubscriber (io.reactivex.internal.operators.flowable)
dispose:543, FlowableBufferTimed$BufferExactBoundedSubscriber (io.reactivex.internal.operators.flowable)
有什么建议吗?
您正试图将流量控制用于并非真正适用的目的。你因此变得愤怒,因为 buffer()
打败了你所有的流量控制机制。
同样,takeUntil()
将在谓词成功后立即取消订阅您的可观察对象。那时没有 "draining",因为所有可接受的值都已传递到下游。
尝试将 stop
用作可观察混合模式的控件,但同样,它并没有按照您的想法行事。
引入 ReplayProcessor
或类似中介将允许冷源即使在没有订阅处于活动状态时也能继续发送数据。
最终,您必须分离出您的顾虑。有几个关于将冷可观察量转换为热可观察量的好教程。
先搜索了一下,没有找到好的答案,所以就这样:)
尝试将 RxJava2 应用到我们在工作中使用的现有管道 - 热源和冷源。虽然停止一个热门案例(用 Flowable.create 包装)很简单,但另一种情况似乎更棘手。 简而言之,我希望能够使源完成 和 在被处置之前耗尽管道中已经存在的项目。 这就是我想出的(takeUntil() 和一个特殊的项目来结束序列),但是关于是否有更好的方法的问题仍然存在
(Flowables.repeatCallable 是 https://github.com/akarnokd/RxJava2Extensions 的扩展,比常用方法更适合我的情况) buffer/map/reduce 只是为了创建与 某些 处理的相似之处 :)
final AtomicInteger counter = new AtomicInteger();
AtomicBoolean stop = new AtomicBoolean(false);
Disposable d = Flowables.repeatCallable(() -> {
return stop.get() ? -1 : counter.getAndIncrement();
}
)
.doOnCancel(() -> {
debug("cancelled");
})
.doOnRequest(l ->
debug(Long.toString(l) + " values requested")
)
.takeUntil(val -> val < 0)
.filter(val -> val >= 0)
.buffer(2000, TimeUnit.MILLISECONDS, scheduler, 15, ArrayListSupplier.asCallable(), true)
.map(list -> {
debug("received " + list.size() + " items: " + list);
return list.size();
})
.reduce(0, (sum, item) -> sum + item)
.subscribeOn(scheduler)
.subscribe(i -> debug("Items received: " + i));
然后,在某个时候:
stop.set(true);
while (!d.isDisposed()) {
// wait
}
这在收到停止信号后完成,最终的 onSuccess 处理程序将执行。 奇怪的是,onCancel 处理程序被调用了两次: 首先来自
onNext:69, FlowableTakeUntilPredicate$InnerSubscriber (io.reactivex.internal.operators.flowable)
然后从
cancel:100, FlowableTakeUntilPredicate$InnerSubscriber (io.reactivex.internal.operators.flowable)
dispose:543, FlowableBufferTimed$BufferExactBoundedSubscriber (io.reactivex.internal.operators.flowable)
有什么建议吗?
您正试图将流量控制用于并非真正适用的目的。你因此变得愤怒,因为 buffer()
打败了你所有的流量控制机制。
同样,takeUntil()
将在谓词成功后立即取消订阅您的可观察对象。那时没有 "draining",因为所有可接受的值都已传递到下游。
尝试将 stop
用作可观察混合模式的控件,但同样,它并没有按照您的想法行事。
引入 ReplayProcessor
或类似中介将允许冷源即使在没有订阅处于活动状态时也能继续发送数据。
最终,您必须分离出您的顾虑。有几个关于将冷可观察量转换为热可观察量的好教程。