如何捕获反应流取消信号?

How to capture the reactive stream cancel signal?

我想在流因任何原因(包括取消)终止后做最后的事情,我 找到了 doFinally 方法,但是在取消时它不起作用,因为 https://github.com/reactor/reactor-core/issues/1090#issuecomment-367633241 show :

Cancellation travels only upstream

那么,如何捕捉取消信号呢?

有我的代码:

    public Mono<Void> myFunction() {
        return Mono.just("hello")
                .flatMap(s -> foo(s))
                .doFinally(signalType -> {
                    // do something finally, but the doFinally won't be called
                    System.out.println(signalType);
                });
    }

    // some other library's function that I cant not modify any way
    public Mono<Void> foo(String s) {
        // return a reactive stream, and will cancel it after it be subscribed, like:
        return Mono.just(s)
                .doOnSubscribe(subscription -> subscription.cancel())
                .then();
    }

你不能在那个特定的安排中,因为 foo() method/library 似乎自己管理订阅(取消),而不是把这个责任留给消费者。因此,这样管理订阅不一定是好事。