Spring 集成阻塞轮询器、异步下游、流完成信号

Spring Integration Blocking Poller, async Downstream, flow complete signal

我正在尝试配置一个查询 bean 的轮询器,以便每隔 X 秒将一个列表放入一个通道中。该通道有一个下游流,它拆分列表并输出到 pub/sub 通道(进一步的异步流) 我如何才能确保在任何给定时间只有在执行流时并且轮询器必须 wait/block 直到流完成直到它为下一次轮询做好准备(固定 rate/delay)?

<int:channel id="configListChannel" />
<task:executor id="pollExecutor" pool-size="1" queue-capacity="1" rejection-policy="ABORT" />
<int:inbound-channel-adapter expression="configMap().values()" auto-startup="true" channel="configListChannel">
    <int:poller fixed-delay="30" time-unit="SECONDS" task-executor="pollExecutor"/>
</int:inbound-channel-adapter>

<task:executor id="configExecutor" pool-size="5"/>
<int:channel id="configChannel" >
    <int:dispatcher task-executor="configExecutor"/>
</int:channel>
<int:chain input-channel="configListChannel" output-channel="configChannel" id="configChain">
    <int:splitter/>
    <int:filter expression="payload.enablePolling"/>
</int:chain>

... configChannel 上的进一步异步流以发送出站消息

有没有异步切换阻塞轮询器并使用屏障来通知轮询器线程完成的示例?一次也只有一个民意调查。

我建议您实施 ReceiveMessageAdvice(自 5.3 或 AbstractMessageSourceAdvice 起)。它 afterReceive() 应该只是 return 原样的消息,但是 beforeReceive() 应该检查一些状态, return false 如果你现在不能轮询。

您可能不需要该任务的屏障,但是简单的 AtomicBoolean bean 可以检查 beforeReceive()false 中的状态并将其返回到 [=18] =] 当你完成下游的任务时。