如何在 Subscribe/Unsubscribe 上执行 Completable

How to execute Completable on Subscribe/Unsubscribe

有我的方法:

public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) {
    api.openWorkshift(workshiftSettings)
            .compose(RxOperatorsHelpers.additionalStacktrace())
            .doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <-
            .doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <-
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .subscribe(subscriber);
}

其中 ActionsSystem.registerAction(...)/ActionsSystem.unregisterActions(...) 看起来像这样:

public Completable registerAction(OperatorAction action) {
    return Completable.fromAction(() -> actions.add(action));
}

public Completable unregisterAction(OperatorAction action) {
    return Completable.fromAction(() -> actions.remove(action));
}

如你所见,我使用 .await() 在源 Observable 的流中执行 Completable。感觉像是错误的解决方案。 怎样才能更优雅?

由于您的 Completable 执行的操作微不足道,您只需将它们的代码内联到 doOnSubscribedoOnUnsubscribe:

        .doOnSubscribe(() -> actions.add(action))
        .doOnUnsubscribe(() -> actions.remove(action))

您可以通过从可完成的 andThen 其余 Observable 序列开始来避免 doOnSubscribe

actionsSyste.registerAction(...)
.andThen(api.openWorkshift(workshiftSettings)
        .compose(RxOperatorsHelpers.additionalStacktrace())
        .doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await())
        .subscribeOn(ioScheduler)
        .observeOn(uiScheduler)
        .doOnError(this::handleError)
 )
 .subscribe(...)

目前,无法在下游取消订阅时执行 Completable,并且在序列可能正常终止或异常终止时也没有简单的方法来执行它。

您可以使用 Observable.defer。此运算符延迟创建可观察对象,直到它被订阅:

Observable observable = Observable.defer(() -> {
        actions.add(action);
        api.openWorkshift(workshiftSettings)
}).compose(RxOperatorsHelpers.additionalStacktrace())
    .subscribeOn(ioScheduler)
    .observeOn(uiScheduler)
    .doOnError(this::handleError);

然后利用Subscription。根据documentation Subscription.create():

Creates and returns a Subscription that invokes the given Action0 when unsubscribed.

所以基本上你需要做的是:

Subscription subscription = Subscriptions.create(new Action0() {
    @Override
    public void call() {
        actionsSyste.unregisterAction(...);
    }
});    
subscriber.add(subscription);
observable.subscribe(subscriber);