如何广播一个冷的可观察对象:用背压重播?
How to broadcast a cold observable: Replay with back-pressure?
我实际上使用的是 Scala,但这个问题对所有 Rx 和流框架都是通用的。
我的用例是我有一个生成的可观察对象(因此很冷),我希望多个消费者并行使用完全相同的值,但我希望它们具有明显不同的吞吐量。
我需要的可以通过重播广播一个可观察对象来完成,但我看到使用最大缓冲区大小重播的常见策略是在溢出时从缓冲区中删除元素(然后对于最慢的消费者来说丢失) ) 而不是向生产者施压。如果您将所有广播的 observable 视为热的,这是有道理的,但是,就我而言,我知道它实际上是冷的并且可以被反压。
有什么方法可以在任何兼容 JVM 反应流的框架中实现这一点吗?
非常感谢!
RxJava 通过 publish
运算符支持这一点,该运算符协调来自各个消费者的请求,也就是说,它以与最慢的消费者请求一样快的固定速率进行请求。不幸的是,目前没有 RxScala 2,只有 RxJava 2 支持 Reactive-Streams 规范,因此,将其转换为 Scala 可能会带来一些不便:
Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f ->
Flowable.mergeArray(
f.observeOn(Schedulers.computation()).map(v -> v * v),
f.observeOn(Schedulers.computation()).map(v -> v * v * v)
)
)
.blockingSubscribe(System.out::println);
另一种方法是使用 ConnectableObservable
并在所有消费者都订阅后手动连接:
ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
.publish();
co.observeOn(Schedulers.computation()).map(v -> v * v)
.subscribe(System.out::println);
co.connect();
我实际上使用的是 Scala,但这个问题对所有 Rx 和流框架都是通用的。
我的用例是我有一个生成的可观察对象(因此很冷),我希望多个消费者并行使用完全相同的值,但我希望它们具有明显不同的吞吐量。
我需要的可以通过重播广播一个可观察对象来完成,但我看到使用最大缓冲区大小重播的常见策略是在溢出时从缓冲区中删除元素(然后对于最慢的消费者来说丢失) ) 而不是向生产者施压。如果您将所有广播的 observable 视为热的,这是有道理的,但是,就我而言,我知道它实际上是冷的并且可以被反压。
有什么方法可以在任何兼容 JVM 反应流的框架中实现这一点吗?
非常感谢!
RxJava 通过 publish
运算符支持这一点,该运算符协调来自各个消费者的请求,也就是说,它以与最慢的消费者请求一样快的固定速率进行请求。不幸的是,目前没有 RxScala 2,只有 RxJava 2 支持 Reactive-Streams 规范,因此,将其转换为 Scala 可能会带来一些不便:
Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f ->
Flowable.mergeArray(
f.observeOn(Schedulers.computation()).map(v -> v * v),
f.observeOn(Schedulers.computation()).map(v -> v * v * v)
)
)
.blockingSubscribe(System.out::println);
另一种方法是使用 ConnectableObservable
并在所有消费者都订阅后手动连接:
ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
.publish();
co.observeOn(Schedulers.computation()).map(v -> v * v)
.subscribe(System.out::println);
co.connect();