Spring 集成:Scatter-Gather、Gather 在并行 Web 服务调用后不起作用
Spring Integration : Scatter-Gather, Gather not working after parallel web service calls
我正在设置 spring 集成流程以使用来自 MQ 的消息并通过创建从 MQ 消息创建的请求来并行调用 Web 服务调用。
下面是 spring 集成流程的样子
- 使用来自 IBM MQ 的消息,使用 Marshaller 转换消息并将实体保存到 DB。
- 将保存的实体发送到 Scatter-Gather 频道。
Scatter-Gather Channel 有两个分发通道,每个分发通道是一个包含以下组件的链
- Web 服务客户端调用 web 服务(服务激活器)
- 将响应转换为实体对象的转换器
- 将数据保存到数据库的处理程序。
收集来自并行网络服务调用的响应,并将从两个并行网络服务调用创建的新对象发送到 RabbitMQ。
我能够从分散-聚集模式进行并行 Web 服务调用,但我没有看到在我的聚集模式中发生聚合,基本上流量不会进入收集器 class。
我尝试使用任务执行器的发布-订阅通道作为分散-收集模式的输入通道,根据日志,webservice 调用与两个任务执行器并行发生,但它在 webservice 调用后从未到达收集器。
<si:service-activator input-channel="transformedEntity"
ref="incidentHandler" output-channel="outputChannelFromMQ" />
<si:scatter-gather input-channel="outputChannelFromMQ"
requires-reply="false" output-channel="gatherResponseOutputChannel" gather-channel="gatherChannel" gather-timeout="4000">
<si:scatterer apply-sequence="true">
<si:recipient channel="distributionChannel1" />
<si:recipient channel="distributionChannel2" />
</si:scatterer>
</si:scatter-gather>
<si:publish-subscribe-channel id="outputChannelFromMQ" apply-sequence="true"
task-executor="taskExecutor" />
<task:executor id="taskExecutor" queue-capacity="25" pool-size="10-10" />
<si:chain id="planngedBagsChain" input-channel="distributionChannel1"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient1" method="getResponse" />
<si:service-activator ref="serviceHandler1" method="saveToDB" />
</si:chain>
<si:chain id="bagHistoryChain" input-channel="distributionChannel2"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient2" method="getResponse" />
<si:transformer ref="transformer" />
<si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>
<si:service-activator input-channel="gatherResponseOutputChannel"
ref="responseTransformer" method="receiveResponse" output-channel="toRabbitMQ" />
启用调试日志后,可以找到发送到的所有频道。通过以上配置,它不会成为正确的收集器。更改为以下配置有效。
<si:service-activator input-channel="transformedEntity"
ref="incidentHandler" output-channel="outputChannelFromMQ" />
<si:channel id="outputChannelFromMQ"></si:channel>
<si:scatter-gather input-channel="outputChannelFromMQ"
requires-reply="false" scatter-channel="scatterInputChannel" output-channel="toRabbit"
gather-channel="gatherChannel" gather-timeout="4000">
<si:gatherer id="responseGatherer" ref="responseTransformer" release-strategy-expression="size() == 2"/>
</si:scatter-gather>
<si:publish-subscribe-channel id="scatterInputChannel" apply-sequence="true"
task-executor="taskExecutor" />
<task:executor id="taskExecutor" queue-capacity="25" pool-size="10-10" />
<si:chain id="planngedBagsChain" input-channel="scatterInputChannel"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient1" method="getResponse" />
<si:service-activator ref="serviceHandler1" method="saveToDB" />
</si:chain>
<si:chain id="bagHistoryChain" input-channel="scatterInputChannel"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient2" method="getResponse" />
<si:transformer ref="transformer" />
<si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>
我正在设置 spring 集成流程以使用来自 MQ 的消息并通过创建从 MQ 消息创建的请求来并行调用 Web 服务调用。
下面是 spring 集成流程的样子
- 使用来自 IBM MQ 的消息,使用 Marshaller 转换消息并将实体保存到 DB。
- 将保存的实体发送到 Scatter-Gather 频道。
Scatter-Gather Channel 有两个分发通道,每个分发通道是一个包含以下组件的链
- Web 服务客户端调用 web 服务(服务激活器)
- 将响应转换为实体对象的转换器
- 将数据保存到数据库的处理程序。
收集来自并行网络服务调用的响应,并将从两个并行网络服务调用创建的新对象发送到 RabbitMQ。
我能够从分散-聚集模式进行并行 Web 服务调用,但我没有看到在我的聚集模式中发生聚合,基本上流量不会进入收集器 class。
我尝试使用任务执行器的发布-订阅通道作为分散-收集模式的输入通道,根据日志,webservice 调用与两个任务执行器并行发生,但它在 webservice 调用后从未到达收集器。
<si:service-activator input-channel="transformedEntity"
ref="incidentHandler" output-channel="outputChannelFromMQ" />
<si:scatter-gather input-channel="outputChannelFromMQ"
requires-reply="false" output-channel="gatherResponseOutputChannel" gather-channel="gatherChannel" gather-timeout="4000">
<si:scatterer apply-sequence="true">
<si:recipient channel="distributionChannel1" />
<si:recipient channel="distributionChannel2" />
</si:scatterer>
</si:scatter-gather>
<si:publish-subscribe-channel id="outputChannelFromMQ" apply-sequence="true"
task-executor="taskExecutor" />
<task:executor id="taskExecutor" queue-capacity="25" pool-size="10-10" />
<si:chain id="planngedBagsChain" input-channel="distributionChannel1"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient1" method="getResponse" />
<si:service-activator ref="serviceHandler1" method="saveToDB" />
</si:chain>
<si:chain id="bagHistoryChain" input-channel="distributionChannel2"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient2" method="getResponse" />
<si:transformer ref="transformer" />
<si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>
<si:service-activator input-channel="gatherResponseOutputChannel"
ref="responseTransformer" method="receiveResponse" output-channel="toRabbitMQ" />
启用调试日志后,可以找到发送到的所有频道。通过以上配置,它不会成为正确的收集器。更改为以下配置有效。
<si:service-activator input-channel="transformedEntity"
ref="incidentHandler" output-channel="outputChannelFromMQ" />
<si:channel id="outputChannelFromMQ"></si:channel>
<si:scatter-gather input-channel="outputChannelFromMQ"
requires-reply="false" scatter-channel="scatterInputChannel" output-channel="toRabbit"
gather-channel="gatherChannel" gather-timeout="4000">
<si:gatherer id="responseGatherer" ref="responseTransformer" release-strategy-expression="size() == 2"/>
</si:scatter-gather>
<si:publish-subscribe-channel id="scatterInputChannel" apply-sequence="true"
task-executor="taskExecutor" />
<task:executor id="taskExecutor" queue-capacity="25" pool-size="10-10" />
<si:chain id="planngedBagsChain" input-channel="scatterInputChannel"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient1" method="getResponse" />
<si:service-activator ref="serviceHandler1" method="saveToDB" />
</si:chain>
<si:chain id="bagHistoryChain" input-channel="scatterInputChannel"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient2" method="getResponse" />
<si:transformer ref="transformer" />
<si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>