如何使用 Project Reactor 中的 Sink 自定义 subscribe() 行为?
How to customize subscribe() behaviour with a Sink in Project Reactor?
对于 Flux
,您可以指定订阅时发生的自定义操作。例如 Flux.create(emitter -> someApi.setCallback(emitter::next))
将在订阅时设置一些 API 挂钩。
我们怎样才能对接收器进行这样的自定义订阅操作?例如。 Sinks.unsafe().many().unicast().onBackpressureBuffer(someAction)
?
我已经设法使用 Flux.concat(Mono.fromRunnable(someAction), sink)
使其工作,但我想这会增加不必要的开销,所以不太理想。
您可以使用 doOnSubscribe
运算符:
sink.asFlux().doOnSubscribe(someAction)
对于 Flux
,您可以指定订阅时发生的自定义操作。例如 Flux.create(emitter -> someApi.setCallback(emitter::next))
将在订阅时设置一些 API 挂钩。
我们怎样才能对接收器进行这样的自定义订阅操作?例如。 Sinks.unsafe().many().unicast().onBackpressureBuffer(someAction)
?
我已经设法使用 Flux.concat(Mono.fromRunnable(someAction), sink)
使其工作,但我想这会增加不必要的开销,所以不太理想。
您可以使用 doOnSubscribe
运算符:
sink.asFlux().doOnSubscribe(someAction)