在 Reactor Sinks.Many() 中相当于 EmitterProcessor onCancel
In Reactor Sinks.Many() what is equivalent to EmitterProcessor onCancel
根据 here 我以前有代码
EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
sink.onCancel(() -> {
cancelSink(id, request);
});
例如,当 rSocket
浏览器打开会话并请求一些数据时,调用 EmitterProcessor
当客户端关闭浏览器时 publisher
like
Flux<String> out = Flux
.from(emitter
.log(log.getName()));
会知道 Flux 订阅者已被取消(当浏览器关闭时)并且会调用 onCancel
句柄。
有了Sinks.Many()
我已经实现了
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = Flux
.from(sink.asFlux()
.log(log.getName()));
并且字符串通过流量发布到浏览器,但是当客户端关闭会话时,不再有 onCancel
来处理一些整理工作。
这看起来像是 discussed here and also here,但我不明白解决方案。请问是什么?
sink.asFlux().doOnCancel(...)
和 sink.asFlux()
是两个不同的实例。您没有重复使用已设置取消处理逻辑的那个,这就是为什么您没有观察到 out
变量上的 cancelSink
清理。
做更多类似的事情:
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxWithCancelSupport = sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = fluxWithCancelSupport
.log(log.getName()));
(PS: 你不需要 Flux.from(sink.asFlux())
因为后者已经给了你 Flux
).
根据 here 我以前有代码
EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
sink.onCancel(() -> {
cancelSink(id, request);
});
例如,当 rSocket
浏览器打开会话并请求一些数据时,调用 EmitterProcessor
当客户端关闭浏览器时 publisher
like
Flux<String> out = Flux
.from(emitter
.log(log.getName()));
会知道 Flux 订阅者已被取消(当浏览器关闭时)并且会调用 onCancel
句柄。
有了Sinks.Many()
我已经实现了
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = Flux
.from(sink.asFlux()
.log(log.getName()));
并且字符串通过流量发布到浏览器,但是当客户端关闭会话时,不再有 onCancel
来处理一些整理工作。
这看起来像是 discussed here and also here,但我不明白解决方案。请问是什么?
sink.asFlux().doOnCancel(...)
和 sink.asFlux()
是两个不同的实例。您没有重复使用已设置取消处理逻辑的那个,这就是为什么您没有观察到 out
变量上的 cancelSink
清理。
做更多类似的事情:
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxWithCancelSupport = sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = fluxWithCancelSupport
.log(log.getName()));
(PS: 你不需要 Flux.from(sink.asFlux())
因为后者已经给了你 Flux
).