Spring 集成:如何按需使用 QueueChannel 作为背压感知反应 (Flux) 管道

Spring Integration: How to consume QueueChannel on-demand as backpressure aware reactive (Flux) pipeline

我有 Spring Integration QueueChannel,我想在反压感知 Flux 管道中使用它。

  1. 从队列中预取 n 条消息。
  2. 对外部系统进行异步调用,如 fun remoteCall(message: Message): Mono<Void>
  3. 在外部调用完成后从队列中提取下一条消息。

我不想使用带调度程序的 Poller 提前从队列中拉取消息。

在最新的 Spring 集成 + Java/Kotlin DSL 中最好的方法是什么,错误恢复等?

参见IntegrationReactiveUtils.messageChannelToFlux()

/**
 * Adapt a provided {@link MessageChannel} into a {@link Flux} source:
 * - a {@link org.springframework.integration.channel.FluxMessageChannel}
 * is returned as is because it is already a {@link Publisher};
 * - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
 * for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
 * - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
 * {@link #messageSourceToFlux(MessageSource)}.
 * @param messageChannel the {@link MessageChannel} to adapt.
 * @param <T> the expected payload type.
 * @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
 */
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {

然后你可以使用 IntegrationFlows.from(Publisher):

/**
 * Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
 * and subscribe it to the provided {@link Publisher}.
 * @param publisher the {@link Publisher} to subscribe to.
 * @return new {@link IntegrationFlowBuilder}.
 */
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {