Spring 集成 - 屏障和路由器

Spring Integration - Barrier and Router

我试着玩一下这里的例子 https://github.com/spring-projects/spring-integration-samples/commit/3c855f82047c2e5f639bbec47ad44b4782b366da, 所以而不是行:

<int:splitter input-channel="processChannel" output-channel="process" order="1" />

我在下面添加:

<int:splitter input-channel="processChannel" output-channel="someCheckGate" order="1"/>

<int:channel id="someCheckGate"/>

<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="aggregatorChannel" expression="1 eq 1">
    <int:mapping value="true" channel="aggregatorChannel"/>
    <int:mapping value="false" channel="aggregatorChannel"/>
</int:router>

(true/false 指向同一个频道来描述我的问题)。我试图做的是根据某些条件(基于 headers 或有效负载......,目前总是正确的)绕过进程通道并且它绕过很好,聚合很好但是它挂起定义调用 BarrierMessageHandler.trigger 后超时:

if (!syncQueue.offer(message, timeout, TimeUnit.MILLISECONDS))

和 returns 回到下一行:

this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);

syncQueue 大小为 0。

线程返回后它试图释放但随后再次挂起:

Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS);

获取 null 并最终在屏障超时时抛出 ReplyRequiredException。 这里似乎是相当不错的逻辑......:) 你能告诉我我在这里错过了什么吗?为什么这个路由器不在这里?

问题是,在这种情况下,您正试图在与要挂起的线程相同的线程上释放屏障。

在这种情况下(因为 process 被所有拆分跳过),释放发生在挂起之前。

屏障的工作方式是,如果释放先发生,则该线程等待直到挂起线程到达。因为是同一个线程,所以永远不会发生。

请注意 process 是一个队列通道 - 这些消息被传递给另一个线程。因此,您需要另一个队列通道用于 "skipped" 拆分。

<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="toAgg" expression="1 eq 1">
    <int:mapping value="true" channel="toAgg"/>
    <int:mapping value="false" channel="toAgg"/>
</int:router>

<int:channel id="toAgg">
    <int:queue />
</int:channel>

<int:bridge input-channel="toAgg" output-channel="aggregatorChannel">
    <int:poller fixed-delay="1000" />
</int:bridge>

此外,该示例是为预期异常而编写的;既然都成功了,框架要回复一下

this commit for a full working version with the router in place

另一个解决方案是使 aggregatorChannel 成为 ExecutorChannel:

<int:channel id="aggregatorChannel">
    <int:dispatcher task-executor="exec" />
</int:channel>