在 RxJava 中调用 Observable 的任何订阅者回调之前执行操作

Do an action before any subscriber callback of an Observable is called in RxJava

我有一个 Observable 链 (RxJava 1),我想在 observable 完成它的工作时(就在它发出一些东西之前)做一些一次性的动作,但是 在调用任何订阅者回调之前(我想更新一些AtomicBoolean值,并让所有订阅者看到新值)。

我以线程安全的方式执行此操作的最佳选择是什么?

我检查了doOnTerminatedoOnCompleteddoAfterTerminate 但似乎无法保证顺序:我放了一些日志,有时会在任何这些方法之前执行第一个订阅者回调。

有没有什么方法可以明确地说 "I want this to be executed before any callback",或者我是否只需要在 return 从我的实用程序方法中订阅它之前作为第一个订阅者订阅 Observable,以及更新该订阅者中的 AtomicBoolean

您可以在 return 到订阅者的观察者链中使用 doOnNext() 运算符。

AtomicBoolean firstCome = new AtomicBoolean( false );
...
Observable<SomeData> observerChain =
  emitSomeData()
    .doOnNext( ignored -> firstCome.set( true ) )
    .share();
return observerChain;

编辑:使用 share() 运算符允许多个订阅者。

回答我自己的问题:这段代码有点复杂,不是我的,所以问的时候没看懂。

基本上代码如下; observable 发出 1 或 2 个值。 该代码依赖于订阅者的 onNext 回调。大多数时候(令人惊讶地)它工作正常,即 onNext 会看到原作者想要的布尔值,但有时 onAfterTerminate 只会在 onNext 回调。

     private Observable<...> getData() {
        return dataStore.getData()
              .doOnNext(...)
              .map(...)
              .doOnSubscribe(() -> myBoolean = true)
              .doAfterTerminate(() -> myBoolean = false)
              .map(...)
      }

    Subscription mySub = getData()
            .observeOn(AndroidSchedulers.mainThread(), true)
            .subscribe(onNextDoSomethingWithABoolean,
               onErrorDoSomethingWithABoolean);

主要修复是向订阅添加 onCompleted 回调并使用那里的布尔值:

    Subscription mySub = getData()
            .observeOn(AndroidSchedulers.mainThread(), true)
            .subscribe(onNextDoSomethingWithABoolean,
              onErrorDoSomethingWithABoolean,
              onCompletedDoSomethingWithTheBoolean);

除此之外我还:

  • boolean 更改为 AtomicBoolean
  • 已将 doAfterTerminate 更改为 doOnTerminate

无论如何,这段代码可能会被重写以完全避免可变状态。