在 RxJava 中调用 Observable 的任何订阅者回调之前执行操作
Do an action before any subscriber callback of an Observable is called in RxJava
我有一个 Observable
链 (RxJava 1),我想在 observable 完成它的工作时(就在它发出一些东西之前)做一些一次性的动作,但是 在调用任何订阅者回调之前(我想更新一些AtomicBoolean
值,并让所有订阅者看到新值)。
我以线程安全的方式执行此操作的最佳选择是什么?
我检查了doOnTerminate
、doOnCompleted
、doAfterTerminate
但似乎无法保证顺序:我放了一些日志,有时会在任何这些方法之前执行第一个订阅者回调。
有没有什么方法可以明确地说 "I want this to be executed before any callback",或者我是否只需要在 return
从我的实用程序方法中订阅它之前作为第一个订阅者订阅 Observable
,以及更新该订阅者中的 AtomicBoolean
?
您可以在 return 到订阅者的观察者链中使用 doOnNext()
运算符。
AtomicBoolean firstCome = new AtomicBoolean( false );
...
Observable<SomeData> observerChain =
emitSomeData()
.doOnNext( ignored -> firstCome.set( true ) )
.share();
return observerChain;
编辑:使用 share()
运算符允许多个订阅者。
回答我自己的问题:这段代码有点复杂,不是我的,所以问的时候没看懂。
基本上代码如下; observable 发出 1 或 2 个值。
该代码依赖于订阅者的 onNext
回调。大多数时候(令人惊讶地)它工作正常,即 onNext
会看到原作者想要的布尔值,但有时 onAfterTerminate
只会在 onNext
回调。
private Observable<...> getData() {
return dataStore.getData()
.doOnNext(...)
.map(...)
.doOnSubscribe(() -> myBoolean = true)
.doAfterTerminate(() -> myBoolean = false)
.map(...)
}
Subscription mySub = getData()
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribe(onNextDoSomethingWithABoolean,
onErrorDoSomethingWithABoolean);
主要修复是向订阅添加 onCompleted
回调并使用那里的布尔值:
Subscription mySub = getData()
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribe(onNextDoSomethingWithABoolean,
onErrorDoSomethingWithABoolean,
onCompletedDoSomethingWithTheBoolean);
除此之外我还:
- 从
boolean
更改为 AtomicBoolean
- 已将
doAfterTerminate
更改为 doOnTerminate
无论如何,这段代码可能会被重写以完全避免可变状态。
我有一个 Observable
链 (RxJava 1),我想在 observable 完成它的工作时(就在它发出一些东西之前)做一些一次性的动作,但是 在调用任何订阅者回调之前(我想更新一些AtomicBoolean
值,并让所有订阅者看到新值)。
我以线程安全的方式执行此操作的最佳选择是什么?
我检查了doOnTerminate
、doOnCompleted
、doAfterTerminate
但似乎无法保证顺序:我放了一些日志,有时会在任何这些方法之前执行第一个订阅者回调。
有没有什么方法可以明确地说 "I want this to be executed before any callback",或者我是否只需要在 return
从我的实用程序方法中订阅它之前作为第一个订阅者订阅 Observable
,以及更新该订阅者中的 AtomicBoolean
?
您可以在 return 到订阅者的观察者链中使用 doOnNext()
运算符。
AtomicBoolean firstCome = new AtomicBoolean( false );
...
Observable<SomeData> observerChain =
emitSomeData()
.doOnNext( ignored -> firstCome.set( true ) )
.share();
return observerChain;
编辑:使用 share()
运算符允许多个订阅者。
回答我自己的问题:这段代码有点复杂,不是我的,所以问的时候没看懂。
基本上代码如下; observable 发出 1 或 2 个值。
该代码依赖于订阅者的 onNext
回调。大多数时候(令人惊讶地)它工作正常,即 onNext
会看到原作者想要的布尔值,但有时 onAfterTerminate
只会在 onNext
回调。
private Observable<...> getData() {
return dataStore.getData()
.doOnNext(...)
.map(...)
.doOnSubscribe(() -> myBoolean = true)
.doAfterTerminate(() -> myBoolean = false)
.map(...)
}
Subscription mySub = getData()
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribe(onNextDoSomethingWithABoolean,
onErrorDoSomethingWithABoolean);
主要修复是向订阅添加 onCompleted
回调并使用那里的布尔值:
Subscription mySub = getData()
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribe(onNextDoSomethingWithABoolean,
onErrorDoSomethingWithABoolean,
onCompletedDoSomethingWithTheBoolean);
除此之外我还:
- 从
boolean
更改为AtomicBoolean
- 已将
doAfterTerminate
更改为doOnTerminate
无论如何,这段代码可能会被重写以完全避免可变状态。