可观察间隔 flatMap 组合不起作用

Observable interval flatMap combination doesnt work

我是 rxjava 2 的新手,我正在尝试以给定的时间间隔在后台执行 someMethod 并在 UI 线程上使用结果。有人可以指出我在代码中出错的地方,或者更好地提供可以完成我需要的工作的最佳代码吗?

@Override
  protected void onStop() {
    subject.onNext(Long.valueOf(10005));
    observable.unsubscribeOn(Schedulers.io());
    super.onStop();
  }

private void initAzimuthUpdater() {
    subject = PublishSubject.create();

    observable = Observable.interval(500, TimeUnit.MILLISECONDS)
        .takeWhile(new Predicate<Long>() {
          @Override
          public boolean test(@NonNull Long aLong) throws Exception {
            Log.d(TAG, "xxxxxxxxxxxx test: " + aLong);
            return aLong != Long.valueOf(10005);
          }
        });

    observable.flatMap(new Function<Long, ObservableSource<Float>>() {
          @Override
          public ObservableSource<Float> apply(@NonNull Long aLong) throws Exception {
            return PublishSubject.create(new ObservableOnSubscribe<Float>() {

              @Override
              public void subscribe(@NonNull ObservableEmitter<Float> e) throws Exception {
                e.onNext(someMethod());
              }
            });
          }
        }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Float>() {
          @Override
          public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "xxxxxxxx onSubscribe:" + System.currentTimeMillis());
            isRunning = true;
          }

          @Override
          public void onNext(@NonNull Float o) {
            Log.d(TAG, "xxxxxxxx onNext:" + System.currentTimeMillis());
            //update UI
          }

          @Override
          public void onError(@NonNull Throwable e) {
            Log.d(TAG, "xxxxxxxx onError:" + System.currentTimeMillis());
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "xxxxxxxx onComplete:" + System.currentTimeMillis());
          }
        });

    subject.mergeWith(azimuthObservable);
  }

您应该使用 DisposableObserversubscribeWith,将 Disposable 保存到 CompositeDisposable,然后从 onStop() 调用 clear() . Observable 运算符 return 一个新实例,因此忽略它们的 return 值将不会影响原始流。

CompositeDisposable composite = new CompositeDisposable();

@Override
protected void onStop() {
    composite.clear();
    super.onStop();
}

private void initAzimuthUpdater() {

    Disposable d = Observable.interval(500, TimeUnit.MILLISECONDS)
       .flatMap(new Function<Long, ObservableSource<Float>>() {
          @Override
          public ObservableSource<Float> apply(@NonNull Long aLong) 
                  throws Exception {
            return Observable.create(new ObservableOnSubscribe<Float>() {

              @Override
              public void subscribe(@NonNull ObservableEmitter<Float> e) 
                    throws Exception {
                e.onNext(someMethod());
              }
            });
          }
        }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new DisposableObserver<Float>() {
            @Override
            public void onStart() {
                Log.d(TAG, "xxxxxxxx onSubscribe:" + System.currentTimeMillis());
                isRunning = true;
          }

          @Override
          public void onNext(@NonNull Float o) {
            Log.d(TAG, "xxxxxxxx onNext:" + System.currentTimeMillis());
            //update UI
          }

          @Override
          public void onError(@NonNull Throwable e) {
            Log.d(TAG, "xxxxxxxx onError:" + System.currentTimeMillis());
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "xxxxxxxx onComplete:" + System.currentTimeMillis());
          }
        });

    composite.add(d);
}