使用 Observables 进行轮询的手动递归的更新版本是什么?
What's the updated version of Manual Recursion to polling using Observables?
一些背景信息:
在此 link: https://github.com/ReactiveX/RxJava/issues/448 @ben-lesh 提出了一个使用 Observables 进行轮询的手动递归实现。
然而在最新的 RxJava 版本中没有 OnSubscribeFunc
。
这是我当前的实现:
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.io().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchObservable()
.doOnNext(new Action1<Item>() {
@Override
public void call(Item item) {
innerSubscriber.onNext(item);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
innerSubscriber.onError(throwable);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
innerSubscriber.onCompleted();
}
}).subscribe(); // Set subscriber?
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
.subscribe(subscriber); // The subscriber
searchObservable
执行服务请求。这在第一个 运行 工作正常,这是数据传递给 subscriber
。然而,在等待pollingInterval
之后,数据returns和doOnNext
被执行但数据没有传递给UI。我是否需要在 schedulePeriodically
占用的 Action
中设置任何订阅者?
它停止是因为您正在调用 innerSubscriber.onCompleted
,它在第一个 运行 上终止序列。有一些标准的运算符可以让您获得相同的效果,而无需创建自定义 Observable:
Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
(注:这里不需要 subscribeOn(),因为 interval 无论如何都会在 Schedulers.io() 上发射。)
这对我来说很好,并且遵循手动递归的范例:
public void manualRecursionPollingStrategy(Subscriber<Item> subscriber, Scheduler observeOnScheduler, long initialDelay, long pollingInterval) {
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchByHashtagObservable()
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the
// UI thread
.subscribe(
new Action1<Item>() {
@Override
public void call(Item item) {
subscriber.onNext(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
subscriber.onError(throwable);
}
}
},
new Action0() {
@Override
public void call() {
subscriber.onCompleted();
}
}
)
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the UI thread
.subscribe(subscriber);
请注意,我订阅了 searchByHashtagObservable()
并调用了作为参数传递的 Subscriber
的 onNext
、onError
和 onCompleted
。
谢谢!
一些背景信息:
在此 link: https://github.com/ReactiveX/RxJava/issues/448 @ben-lesh 提出了一个使用 Observables 进行轮询的手动递归实现。
然而在最新的 RxJava 版本中没有 OnSubscribeFunc
。
这是我当前的实现:
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.io().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchObservable()
.doOnNext(new Action1<Item>() {
@Override
public void call(Item item) {
innerSubscriber.onNext(item);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
innerSubscriber.onError(throwable);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
innerSubscriber.onCompleted();
}
}).subscribe(); // Set subscriber?
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
.subscribe(subscriber); // The subscriber
searchObservable
执行服务请求。这在第一个 运行 工作正常,这是数据传递给 subscriber
。然而,在等待pollingInterval
之后,数据returns和doOnNext
被执行但数据没有传递给UI。我是否需要在 schedulePeriodically
占用的 Action
中设置任何订阅者?
它停止是因为您正在调用 innerSubscriber.onCompleted
,它在第一个 运行 上终止序列。有一些标准的运算符可以让您获得相同的效果,而无需创建自定义 Observable:
Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
(注:这里不需要 subscribeOn(),因为 interval 无论如何都会在 Schedulers.io() 上发射。)
这对我来说很好,并且遵循手动递归的范例:
public void manualRecursionPollingStrategy(Subscriber<Item> subscriber, Scheduler observeOnScheduler, long initialDelay, long pollingInterval) {
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchByHashtagObservable()
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the
// UI thread
.subscribe(
new Action1<Item>() {
@Override
public void call(Item item) {
subscriber.onNext(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
subscriber.onError(throwable);
}
}
},
new Action0() {
@Override
public void call() {
subscriber.onCompleted();
}
}
)
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the UI thread
.subscribe(subscriber);
请注意,我订阅了 searchByHashtagObservable()
并调用了作为参数传递的 Subscriber
的 onNext
、onError
和 onCompleted
。
谢谢!