如何在 Subscribe/Unsubscribe 上执行 Completable
How to execute Completable on Subscribe/Unsubscribe
有我的方法:
public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) {
api.openWorkshift(workshiftSettings)
.compose(RxOperatorsHelpers.additionalStacktrace())
.doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <-
.doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <-
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.subscribe(subscriber);
}
其中 ActionsSystem.registerAction(...)
/ActionsSystem.unregisterActions(...)
看起来像这样:
public Completable registerAction(OperatorAction action) {
return Completable.fromAction(() -> actions.add(action));
}
public Completable unregisterAction(OperatorAction action) {
return Completable.fromAction(() -> actions.remove(action));
}
如你所见,我使用 .await()
在源 Observable 的流中执行 Completable
。感觉像是错误的解决方案。 怎样才能更优雅?
由于您的 Completable
执行的操作微不足道,您只需将它们的代码内联到 doOnSubscribe
和 doOnUnsubscribe
:
.doOnSubscribe(() -> actions.add(action))
.doOnUnsubscribe(() -> actions.remove(action))
您可以通过从可完成的 andThen
其余 Observable
序列开始来避免 doOnSubscribe
:
actionsSyste.registerAction(...)
.andThen(api.openWorkshift(workshiftSettings)
.compose(RxOperatorsHelpers.additionalStacktrace())
.doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await())
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
)
.subscribe(...)
目前,无法在下游取消订阅时执行 Completable
,并且在序列可能正常终止或异常终止时也没有简单的方法来执行它。
您可以使用 Observable.defer
。此运算符延迟创建可观察对象,直到它被订阅:
Observable observable = Observable.defer(() -> {
actions.add(action);
api.openWorkshift(workshiftSettings)
}).compose(RxOperatorsHelpers.additionalStacktrace())
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError);
然后利用Subscription
。根据documentation Subscription.create()
:
Creates and returns a Subscription that invokes the given Action0 when unsubscribed.
所以基本上你需要做的是:
Subscription subscription = Subscriptions.create(new Action0() {
@Override
public void call() {
actionsSyste.unregisterAction(...);
}
});
subscriber.add(subscription);
observable.subscribe(subscriber);
有我的方法:
public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) {
api.openWorkshift(workshiftSettings)
.compose(RxOperatorsHelpers.additionalStacktrace())
.doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <-
.doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <-
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.subscribe(subscriber);
}
其中 ActionsSystem.registerAction(...)
/ActionsSystem.unregisterActions(...)
看起来像这样:
public Completable registerAction(OperatorAction action) {
return Completable.fromAction(() -> actions.add(action));
}
public Completable unregisterAction(OperatorAction action) {
return Completable.fromAction(() -> actions.remove(action));
}
如你所见,我使用 .await()
在源 Observable 的流中执行 Completable
。感觉像是错误的解决方案。 怎样才能更优雅?
由于您的 Completable
执行的操作微不足道,您只需将它们的代码内联到 doOnSubscribe
和 doOnUnsubscribe
:
.doOnSubscribe(() -> actions.add(action))
.doOnUnsubscribe(() -> actions.remove(action))
您可以通过从可完成的 andThen
其余 Observable
序列开始来避免 doOnSubscribe
:
actionsSyste.registerAction(...)
.andThen(api.openWorkshift(workshiftSettings)
.compose(RxOperatorsHelpers.additionalStacktrace())
.doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await())
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
)
.subscribe(...)
目前,无法在下游取消订阅时执行 Completable
,并且在序列可能正常终止或异常终止时也没有简单的方法来执行它。
您可以使用 Observable.defer
。此运算符延迟创建可观察对象,直到它被订阅:
Observable observable = Observable.defer(() -> {
actions.add(action);
api.openWorkshift(workshiftSettings)
}).compose(RxOperatorsHelpers.additionalStacktrace())
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError);
然后利用Subscription
。根据documentation Subscription.create()
:
Creates and returns a Subscription that invokes the given Action0 when unsubscribed.
所以基本上你需要做的是:
Subscription subscription = Subscriptions.create(new Action0() {
@Override
public void call() {
actionsSyste.unregisterAction(...);
}
});
subscriber.add(subscription);
observable.subscribe(subscriber);