为什么 RxJava2 不支持 "Maybe.doOnDispose"?

Why "Maybe.doOnDispose" is not supported in RxJava2?

我在 RxJava2 中使用 Maybe class。

我注册了 doOnDispose 回调来检测 Dispose 事件,但它没有被触发。

Maybe.just("aaa")
    .doOnDispose({ /* do something */ })
    .subscribe( ... )

看了RxJava 2的代码,Maybe好像不支持doOnDispose

Maybe 是在 doOnDispose,

中创建 MaybePeek(不是 DoOnDisposeObserver)对象
@CheckReturnValue
@SchedulerSupport("none")
public final Maybe<T> doOnDispose(Action onDispose) {
    return RxJavaPlugins.onAssembly(new MaybePeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION, (Action)ObjectHelper.requireNonNull(onDispose, "onDispose is null")));
}

protected void subscribeActual(MaybeObserver<? super T> observer) {
    this.source.subscribe(new MaybePeek.MaybePeekObserver(observer, this));
}

但是,Single 是创建 DoOnDisposeObserver,并且工作正常。

@CheckReturnValue
@SchedulerSupport("none")
public final Single<T> doOnDispose(Action onDispose) {
    ObjectHelper.requireNonNull(onDispose, "onDispose is null");
    return RxJavaPlugins.onAssembly(new SingleDoOnDispose(this, onDispose));
}

protected void subscribeActual(SingleObserver<? super T> s) {
    this.source.subscribe(new SingleDoOnDispose.DoOnDisposeObserver(s, this.onDispose));
}

为什么不支持 Maybe.doOnDispose

如文档所述 doOnDispose(Action onDispose)

Calls the dispose Action if the downstream disposes the sequence.

由于您的下游从不处理它,因此它永远不会调用。

Disposable disposable = Maybe.just("aaa")
    .doOnDispose({ /* do something */ })
    .subscribe( ... )

disposable.dispose();

现在应该调用 doOnDispose 中的操作。

请注意,如果完成流的时间少于进行下一个操作 (disposable.dispose()) 的时间,则不应调用 onDispose 操作。所以,为了验证它,你可以使用延迟:

Disposable disposable = Maybe.just("aaa")
    .delay(2000, TimeUnit.MILLISECONDS)
    .doOnDispose({ /* do something */ })
    .subscribe( ... )

disposable.dispose();

现在应该触发动作了。