使用 Observables 进行轮询的手动递归的更新版本是什么?

What's the updated version of Manual Recursion to polling using Observables?

一些背景信息: 在此 link: https://github.com/ReactiveX/RxJava/issues/448 @ben-lesh 提出了一个使用 Observables 进行轮询的手动递归实现。 然而在最新的 RxJava 版本中没有 OnSubscribeFunc

这是我当前的实现:

Observable.create(new Observable.OnSubscribe<Item>() {
        @Override
        public void call(final Subscriber<? super Item> innerSubscriber) {

            Schedulers.io().createWorker()
                      .schedulePeriodically(new Action0() {
                          @Override
                          public void call() {
                              searchObservable()
                                      .doOnNext(new Action1<Item>() {
                                          @Override
                                          public void call(Item item) {
                                              innerSubscriber.onNext(item);
                                          }
                                      })
                                      .doOnError(new Action1<Throwable>() {
                                          @Override
                                          public void call(Throwable throwable) {
                                              if (throwable != null) {
                                                  innerSubscriber.onError(throwable);
                                              }
                                          }
                                      })
                                      .doOnCompleted(new Action0() {
                                          @Override
                                          public void call() {
                                              innerSubscriber.onCompleted();
                                          }
                                      }).subscribe(); // Set subscriber?
                          }
                      }, initialDelay, pollingInterval, TimeUnit.MINUTES);
        }
    })
            .subscribeOn(Schedulers.io()) // performs networking on background thread
            .observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
            .subscribe(subscriber); // The subscriber

searchObservable 执行服务请求。这在第一个 运行 工作正常,这是数据传递给 subscriber。然而,在等待pollingInterval之后,数据returns和doOnNext被执行但数据没有传递给UI。我是否需要在 schedulePeriodically 占用的 Action 中设置任何订阅者?

它停止是因为您正在调用 innerSubscriber.onCompleted,它在第一个 运行 上终止序列。有一些标准的运算符可以让您获得相同的效果,而无需创建自定义 Observable:

Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

(注:这里不需要 subscribeOn(),因为 interval 无论如何都会在 Schedulers.io() 上发射。)

这对我来说很好,并且遵循手动递归的范例:

public void manualRecursionPollingStrategy(Subscriber<Item> subscriber, Scheduler observeOnScheduler, long initialDelay, long pollingInterval) {
            Observable.create(new Observable.OnSubscribe<Item>() {
                @Override
                public void call(final Subscriber<? super Item> innerSubscriber) {
                    Schedulers.newThread().createWorker()
                              .schedulePeriodically(new Action0() {
                                  @Override
                                  public void call() {
                                              searchByHashtagObservable()
                                                      .subscribeOn(Schedulers.io()) // performs networking on background thread
                                                      .observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the
                                                              // UI thread
                                                      .subscribe(
                                                              new Action1<Item>() {
                                                                  @Override
                                                                  public void call(Item item) {
                                                                      subscriber.onNext(item);
                                                                  }
                                                              },
                                                              new Action1<Throwable>() {
                                                                  @Override
                                                                  public void call(Throwable throwable) {
                                                                      if (throwable != null) {
                                                                          subscriber.onError(throwable);
                                                                      }
                                                                  }
                                                              },
                                                              new Action0() {
                                                                  @Override
                                                                  public void call() {
                                                                      subscriber.onCompleted();
                                                                  }
                                                              }
                                                      )
                                  }
                              }, initialDelay, pollingInterval, TimeUnit.MINUTES);
                }
            })
              .observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the UI thread
              .subscribe(subscriber);

请注意,我订阅了 searchByHashtagObservable() 并调用了作为参数传递的 SubscriberonNextonErroronCompleted

谢谢!