Observable 支持反应式拉动
Observable supporting reactive pull
我一直在为我认为是一个非常基本的问题而苦苦挣扎。
我有一个 Flowable
,它从网络中检索一捆物品并发出它们。
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retrieveNext();
while (item != null) {
emitter.onNext(item);
if (item.isLast) {
break;
}
item = retrieveNext();
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
// ...
}
});
我需要它在每个请求的基础上工作。也就是说,我保留一个Subscription
,必要时调用subscription.request(n)
。
关于 Backpressure 的文章("Reactive pull" 部分)描述了 Observer
的观点,并简单地提到了
For [reactive pull] to work, though, Observables A and B must respond
correctly to the request() <...> such support is not a requirement for
Observables
它没有详细说明原则上如何实现这种支持。
是否有实现此类 Observable
/Flowable
的通用模式?我没有找到一个很好的例子或参考。我能想到的一个糟糕的选择是有一个监视器,当 emitter.requested() == 0
时我会在我的 while
循环中锁定并从另一个线程在 doOnRequest()
中解锁。但是这种方法感觉很麻烦。
那么这个"slowing down"一般是如何实现的呢?
谢谢。
使用Flowable.generate
:
Flowable.generate(
() -> generateItemRetriever(),
(Retriever<Item> retriever, Emitter<Item> emitter) -> {
Item item = retriever.retrieveNext();
if(item!=null && !item.isLast()) {
emitter.onNext(item);
} else {
emitter.onComplete();
}
}
)
生成的 Flowable 将具有背压感知和取消订阅感知功能。如果您的 Retriever
具有需要处理的状态,请使用 3 参数 generate
重载。
但是,如果您正在进行 HTTP 调用,我建议您研究一下 Retrofit 之类的东西,它与 RxJava 配合得很好。
我一直在为我认为是一个非常基本的问题而苦苦挣扎。
我有一个 Flowable
,它从网络中检索一捆物品并发出它们。
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retrieveNext();
while (item != null) {
emitter.onNext(item);
if (item.isLast) {
break;
}
item = retrieveNext();
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
// ...
}
});
我需要它在每个请求的基础上工作。也就是说,我保留一个Subscription
,必要时调用subscription.request(n)
。
关于 Backpressure 的文章("Reactive pull" 部分)描述了 Observer
的观点,并简单地提到了
For [reactive pull] to work, though, Observables A and B must respond correctly to the request() <...> such support is not a requirement for Observables
它没有详细说明原则上如何实现这种支持。
是否有实现此类 Observable
/Flowable
的通用模式?我没有找到一个很好的例子或参考。我能想到的一个糟糕的选择是有一个监视器,当 emitter.requested() == 0
时我会在我的 while
循环中锁定并从另一个线程在 doOnRequest()
中解锁。但是这种方法感觉很麻烦。
那么这个"slowing down"一般是如何实现的呢?
谢谢。
使用Flowable.generate
:
Flowable.generate(
() -> generateItemRetriever(),
(Retriever<Item> retriever, Emitter<Item> emitter) -> {
Item item = retriever.retrieveNext();
if(item!=null && !item.isLast()) {
emitter.onNext(item);
} else {
emitter.onComplete();
}
}
)
生成的 Flowable 将具有背压感知和取消订阅感知功能。如果您的 Retriever
具有需要处理的状态,请使用 3 参数 generate
重载。
但是,如果您正在进行 HTTP 调用,我建议您研究一下 Retrofit 之类的东西,它与 RxJava 配合得很好。