Spring 集成聚合器时间过期 - 问题

Spring integration aggregator time expire - issue

在进入出站通道之前,下面的代码正在接受 2 条消息。

<bean id="timeout"
    class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    <constructor-arg name="threshold" value="2" />
    <constructor-arg name="timeout" value="7000" />
</bean> 

<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput" 
        method="handleMessage" release-strategy="releaseStrategyBean" release-strategy-method="timeout">
</int:aggregator>

我的用例是在 10 分钟内整理所有消息并将其发送到出站通道。不是基于消息的数量,如上所示。 要实现此基于时间的功能,请使用以下代码:

<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput" 
        method="handleMessage" 
                output-channel="outputappendFilenameinHeader" >
</int:aggregator>

<bean id="updateCreate" class="helper.UpdateCreateHelper"/>

我传递了 10 条消息,PojoDateStrategyHelper canRelease 方法被调用了 10 次。

尝试使用时差逻辑实现 PojoDateStrategyHelper,它按预期工作。 10 分钟后 UpdateCreateHelper class 被调用,但它只收到 1 条消息(最后一条消息)。其余 9 条消息在任何地方都看不到。我在这里做错了什么吗?消息没有整理。

我怀疑 SI 中应该有一些内置的东西可以实现这一点,如果我将 10 分钟作为参数传递,一旦 10 分钟时间到期,它应该将所有消息传递到出站通道。

这是我的 UpdateCreateHelper.java 代码:

public Message<?> handleMessage(List<Message<?>> flights){

    LOGGER.debug("orderItems list ::"+flights.size()); // this is always printing 1

    MessageBuilder<?> messageWithHeader = MessageBuilder.withPayload(flights.get(0).getPayload().toString());
    messageWithHeader.setHeader("ftp_filename", "");

    return messageWithHeader.build();
}

@CorrelationStrategy
public String correlateBy(@Header("id") String id) {
    return id;
}
@ReleaseStrategy
public boolean canRelease(List<Message<?>> flights) {
    LOGGER.debug("inside canRelease ::"+flights.size()); // This is called for each and every message
    return compareTime(date.getTime(), new Date().getTime());
}

我是 SI 的新手 (v3.x),我搜索了很多时间限制相关的聚合器,找不到任何有用的资源,请建议。

谢谢!

打开 DEBUG 日志记录以查看为什么您只看到一条消息。

I suspect there should be something inbuilt with in SI, which can achieve this, ...

在 4.0 版之前(默认情况下,在 4.0 版之后),聚合器是一个完全被动的组件;只有在有新消息到达时才参考发布策略。

4.0 added group timeout capabilities 超时后可以释放(或丢弃)部分组。

但是,对于任何版本,您都可以配置 MessageGroupStoreReaper 以在超时后释放部分完整的组。参见 the documentation

private String correlationId = date.toString();

@CorrelationStrategy
public String correlateBy(Message<?> message) {
    **// Return the correlation ID which is the timestamp the current window started (all messages should have the same correlation id)**
    return "same";
}

之前我返回的是 Header Id,这与 Message to Message 不同。我希望这个解决方案可以帮助一些人。忽略这么小的一个概念,我浪费了将近2天。