Spring 集成:如何按需使用 QueueChannel 作为背压感知反应 (Flux) 管道
Spring Integration: How to consume QueueChannel on-demand as backpressure aware reactive (Flux) pipeline
我有 Spring Integration QueueChannel,我想在反压感知 Flux 管道中使用它。
- 从队列中预取
n
条消息。
- 对外部系统进行异步调用,如
fun remoteCall(message: Message): Mono<Void>
。
- 在外部调用完成后从队列中提取下一条消息。
我不想使用带调度程序的 Poller 提前从队列中拉取消息。
在最新的 Spring 集成 + Java/Kotlin DSL 中最好的方法是什么,错误恢复等?
参见IntegrationReactiveUtils.messageChannelToFlux()
:
/**
* Adapt a provided {@link MessageChannel} into a {@link Flux} source:
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
* is returned as is because it is already a {@link Publisher};
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
* for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
* {@link #messageSourceToFlux(MessageSource)}.
* @param messageChannel the {@link MessageChannel} to adapt.
* @param <T> the expected payload type.
* @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
*/
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
然后你可以使用 IntegrationFlows.from(Publisher)
:
/**
* Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
* and subscribe it to the provided {@link Publisher}.
* @param publisher the {@link Publisher} to subscribe to.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
我有 Spring Integration QueueChannel,我想在反压感知 Flux 管道中使用它。
- 从队列中预取
n
条消息。 - 对外部系统进行异步调用,如
fun remoteCall(message: Message): Mono<Void>
。 - 在外部调用完成后从队列中提取下一条消息。
我不想使用带调度程序的 Poller 提前从队列中拉取消息。
在最新的 Spring 集成 + Java/Kotlin DSL 中最好的方法是什么,错误恢复等?
参见IntegrationReactiveUtils.messageChannelToFlux()
:
/**
* Adapt a provided {@link MessageChannel} into a {@link Flux} source:
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
* is returned as is because it is already a {@link Publisher};
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
* for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
* {@link #messageSourceToFlux(MessageSource)}.
* @param messageChannel the {@link MessageChannel} to adapt.
* @param <T> the expected payload type.
* @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
*/
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
然后你可以使用 IntegrationFlows.from(Publisher)
:
/**
* Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
* and subscribe it to the provided {@link Publisher}.
* @param publisher the {@link Publisher} to subscribe to.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {