在 Reactor Sinks.Many() 中相当于 EmitterProcessor onCancel

In Reactor Sinks.Many() what is equivalent to EmitterProcessor onCancel

根据 here 我以前有代码

EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

sink.onCancel(() -> {
  cancelSink(id, request);
});

例如,当 rSocket 浏览器打开会话并请求一些数据时,调用 EmitterProcessor 当客户端关闭浏览器时 publisher like

Flux<String> out = Flux
    .from(emitter
    .log(log.getName())); 

会知道 Flux 订阅者已被取消(当浏览器关闭时)并且会调用 onCancel 句柄。

有了Sinks.Many()我已经实现了

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});

Flux<String> out = Flux
    .from(sink.asFlux()
    .log(log.getName()));

并且字符串通过流量发布到浏览器,但是当客户端关闭会话时,不再有 onCancel 来处理一些整理工作。

这看起来像是 discussed here and also here,但我不明白解决方案。请问是什么?

sink.asFlux().doOnCancel(...)sink.asFlux() 是两个不同的实例。您没有重复使用已设置取消处理逻辑的那个,这就是为什么您没有观察到 out 变量上的 cancelSink 清理。

做更多类似的事情:

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

Flux<String> fluxWithCancelSupport = sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});

Flux<String> out = fluxWithCancelSupport
    .log(log.getName()));

(PS: 你不需要 Flux.from(sink.asFlux()) 因为后者已经给了你 Flux).