Spring 集成 DSL - 与网关块线程的组合
Spring Integration DSL - composition with gateway blocks thread
我正在尝试了解如何将 IntegrationFlows 构建为单元,并将它们连接起来。
我设置了一个非常简单的处理集成流程:
IntegrationFlow processingFlow = f -> f
.<String>handle((p, h) -> process(p))
.log();
flowContext.registration(processingFlow)
.id("testProcessing")
.autoStartup(false)
.register();
处理很简单:
public String process(String process) {
return process + " has been processed";
}
然后我从一个源组成一个流,使用 .gateway()
将源加入处理:
MessageChannel beginningChannel = MessageChannels.direct("beginning").get();
StandardIntegrationFlow composedFlow = IntegrationFlows
.from(beginningChannel)
.gateway(processingFlow)
.log()
.get();
flowContext.registration(composedFlow)
.id("testComposed")
.autoStartup(false)
.addBean(processingFlow)
.register();
然后我开始流程并发送几条消息:
composedFlow.start();
beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());
日志记录处理程序确认已为第一条消息调用 handle 方法,但主线程随后处于空闲状态,第二条消息永远不会被处理。
这不是从构建块组成集成流的正确方法吗?使用频道这样做需要将频道注册为 bean,我正在尝试动态地完成所有这些工作。
processingFlow
中必须是logAndReply()
。请参阅他们的 JavaDocs 了解不同之处。流程末尾的 log()
使其成为 one-way。这就是为什么你被阻止,因为网关等待回复,但没有人根据你当前的流定义。不幸的是,我们无法从框架级别确定这一点:在某些情况下,您可能确实没有 return 根据某些路由或过滤逻辑。网关可以配置回复超时。默认情况下它是无限的。
我正在尝试了解如何将 IntegrationFlows 构建为单元,并将它们连接起来。
我设置了一个非常简单的处理集成流程:
IntegrationFlow processingFlow = f -> f
.<String>handle((p, h) -> process(p))
.log();
flowContext.registration(processingFlow)
.id("testProcessing")
.autoStartup(false)
.register();
处理很简单:
public String process(String process) {
return process + " has been processed";
}
然后我从一个源组成一个流,使用 .gateway()
将源加入处理:
MessageChannel beginningChannel = MessageChannels.direct("beginning").get();
StandardIntegrationFlow composedFlow = IntegrationFlows
.from(beginningChannel)
.gateway(processingFlow)
.log()
.get();
flowContext.registration(composedFlow)
.id("testComposed")
.autoStartup(false)
.addBean(processingFlow)
.register();
然后我开始流程并发送几条消息:
composedFlow.start();
beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());
日志记录处理程序确认已为第一条消息调用 handle 方法,但主线程随后处于空闲状态,第二条消息永远不会被处理。
这不是从构建块组成集成流的正确方法吗?使用频道这样做需要将频道注册为 bean,我正在尝试动态地完成所有这些工作。
processingFlow
中必须是logAndReply()
。请参阅他们的 JavaDocs 了解不同之处。流程末尾的 log()
使其成为 one-way。这就是为什么你被阻止,因为网关等待回复,但没有人根据你当前的流定义。不幸的是,我们无法从框架级别确定这一点:在某些情况下,您可能确实没有 return 根据某些路由或过滤逻辑。网关可以配置回复超时。默认情况下它是无限的。