Spring 集成 - 屏障和路由器
Spring Integration - Barrier and Router
我试着玩一下这里的例子 https://github.com/spring-projects/spring-integration-samples/commit/3c855f82047c2e5f639bbec47ad44b4782b366da,
所以而不是行:
<int:splitter input-channel="processChannel" output-channel="process" order="1" />
我在下面添加:
<int:splitter input-channel="processChannel" output-channel="someCheckGate" order="1"/>
<int:channel id="someCheckGate"/>
<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="aggregatorChannel" expression="1 eq 1">
<int:mapping value="true" channel="aggregatorChannel"/>
<int:mapping value="false" channel="aggregatorChannel"/>
</int:router>
(true/false 指向同一个频道来描述我的问题)。我试图做的是根据某些条件(基于 headers 或有效负载......,目前总是正确的)绕过进程通道并且它绕过很好,聚合很好但是它挂起定义调用 BarrierMessageHandler.trigger 后超时:
if (!syncQueue.offer(message, timeout, TimeUnit.MILLISECONDS))
和 returns 回到下一行:
this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);
syncQueue 大小为 0。
线程返回后它试图释放但随后再次挂起:
Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
获取 null 并最终在屏障超时时抛出 ReplyRequiredException。
这里似乎是相当不错的逻辑......:)
你能告诉我我在这里错过了什么吗?为什么这个路由器不在这里?
问题是,在这种情况下,您正试图在与要挂起的线程相同的线程上释放屏障。
在这种情况下(因为 process
被所有拆分跳过),释放发生在挂起之前。
屏障的工作方式是,如果释放先发生,则该线程等待直到挂起线程到达。因为是同一个线程,所以永远不会发生。
请注意 process
是一个队列通道 - 这些消息被传递给另一个线程。因此,您需要另一个队列通道用于 "skipped" 拆分。
<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="toAgg" expression="1 eq 1">
<int:mapping value="true" channel="toAgg"/>
<int:mapping value="false" channel="toAgg"/>
</int:router>
<int:channel id="toAgg">
<int:queue />
</int:channel>
<int:bridge input-channel="toAgg" output-channel="aggregatorChannel">
<int:poller fixed-delay="1000" />
</int:bridge>
此外,该示例是为预期异常而编写的;既然都成功了,框架要回复一下
见this commit for a full working version with the router in place。
另一个解决方案是使 aggregatorChannel
成为 ExecutorChannel
:
<int:channel id="aggregatorChannel">
<int:dispatcher task-executor="exec" />
</int:channel>
我试着玩一下这里的例子 https://github.com/spring-projects/spring-integration-samples/commit/3c855f82047c2e5f639bbec47ad44b4782b366da, 所以而不是行:
<int:splitter input-channel="processChannel" output-channel="process" order="1" />
我在下面添加:
<int:splitter input-channel="processChannel" output-channel="someCheckGate" order="1"/>
<int:channel id="someCheckGate"/>
<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="aggregatorChannel" expression="1 eq 1">
<int:mapping value="true" channel="aggregatorChannel"/>
<int:mapping value="false" channel="aggregatorChannel"/>
</int:router>
(true/false 指向同一个频道来描述我的问题)。我试图做的是根据某些条件(基于 headers 或有效负载......,目前总是正确的)绕过进程通道并且它绕过很好,聚合很好但是它挂起定义调用 BarrierMessageHandler.trigger 后超时:
if (!syncQueue.offer(message, timeout, TimeUnit.MILLISECONDS))
和 returns 回到下一行:
this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);
syncQueue 大小为 0。
线程返回后它试图释放但随后再次挂起:
Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
获取 null 并最终在屏障超时时抛出 ReplyRequiredException。 这里似乎是相当不错的逻辑......:) 你能告诉我我在这里错过了什么吗?为什么这个路由器不在这里?
问题是,在这种情况下,您正试图在与要挂起的线程相同的线程上释放屏障。
在这种情况下(因为 process
被所有拆分跳过),释放发生在挂起之前。
屏障的工作方式是,如果释放先发生,则该线程等待直到挂起线程到达。因为是同一个线程,所以永远不会发生。
请注意 process
是一个队列通道 - 这些消息被传递给另一个线程。因此,您需要另一个队列通道用于 "skipped" 拆分。
<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="toAgg" expression="1 eq 1">
<int:mapping value="true" channel="toAgg"/>
<int:mapping value="false" channel="toAgg"/>
</int:router>
<int:channel id="toAgg">
<int:queue />
</int:channel>
<int:bridge input-channel="toAgg" output-channel="aggregatorChannel">
<int:poller fixed-delay="1000" />
</int:bridge>
此外,该示例是为预期异常而编写的;既然都成功了,框架要回复一下
见this commit for a full working version with the router in place。
另一个解决方案是使 aggregatorChannel
成为 ExecutorChannel
:
<int:channel id="aggregatorChannel">
<int:dispatcher task-executor="exec" />
</int:channel>