Spring 集成 DSL - 与网关块线程的组合

Spring Integration DSL - composition with gateway blocks thread

我正在尝试了解如何将 IntegrationFlows 构建为单元,并将它们连接起来。

我设置了一个非常简单的处理集成流程:


IntegrationFlow processingFlow = f -> f
                .<String>handle((p, h) -> process(p))
                .log();

        flowContext.registration(processingFlow)
                .id("testProcessing")
                .autoStartup(false)
                .register();

处理很简单:

   public String process(String process) {
            return process + " has been processed";
    }

然后我从一个源组成一个流,使用 .gateway() 将源加入处理:

  MessageChannel beginningChannel = MessageChannels.direct("beginning").get();

        StandardIntegrationFlow composedFlow = IntegrationFlows
                .from(beginningChannel)
                .gateway(processingFlow)
                .log()
                .get();

        flowContext.registration(composedFlow)
                .id("testComposed")
                .autoStartup(false)
                .addBean(processingFlow)
                .register();

然后我开始流程并发送几条消息:

 composedFlow.start();

 beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
 beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());

日志记录处理程序确认已为第一条消息调用 handle 方法,但主线程随后处于空闲状态,第二条消息永远不会被处理。

这不是从构建块组成集成流的正确方法吗?使用频道这样做需要将频道注册为 bean,我正在尝试动态地完成所有这些工作。

processingFlow中必须是logAndReply()。请参阅他们的 JavaDocs 了解不同之处。流程末尾的 log() 使其成为 one-way。这就是为什么你被阻止,因为网关等待回复,但没有人根据你当前的流定义。不幸的是,我们无法从框架级别确定这一点:在某些情况下,您可能确实没有 return 根据某些路由或过滤逻辑。网关可以配置回复超时。默认情况下它是无限的。