Spring 集成聚合器时间过期 - 问题
Spring integration aggregator time expire - issue
在进入出站通道之前,下面的代码正在接受 2 条消息。
<bean id="timeout"
class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
<constructor-arg name="threshold" value="2" />
<constructor-arg name="timeout" value="7000" />
</bean>
<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput"
method="handleMessage" release-strategy="releaseStrategyBean" release-strategy-method="timeout">
</int:aggregator>
我的用例是在 10 分钟内整理所有消息并将其发送到出站通道。不是基于消息的数量,如上所示。
要实现此基于时间的功能,请使用以下代码:
<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput"
method="handleMessage"
output-channel="outputappendFilenameinHeader" >
</int:aggregator>
<bean id="updateCreate" class="helper.UpdateCreateHelper"/>
我传递了 10 条消息,PojoDateStrategyHelper canRelease 方法被调用了 10 次。
尝试使用时差逻辑实现 PojoDateStrategyHelper,它按预期工作。 10 分钟后 UpdateCreateHelper class 被调用,但它只收到 1 条消息(最后一条消息)。其余 9 条消息在任何地方都看不到。我在这里做错了什么吗?消息没有整理。
我怀疑 SI 中应该有一些内置的东西可以实现这一点,如果我将 10 分钟作为参数传递,一旦 10 分钟时间到期,它应该将所有消息传递到出站通道。
这是我的 UpdateCreateHelper.java 代码:
public Message<?> handleMessage(List<Message<?>> flights){
LOGGER.debug("orderItems list ::"+flights.size()); // this is always printing 1
MessageBuilder<?> messageWithHeader = MessageBuilder.withPayload(flights.get(0).getPayload().toString());
messageWithHeader.setHeader("ftp_filename", "");
return messageWithHeader.build();
}
@CorrelationStrategy
public String correlateBy(@Header("id") String id) {
return id;
}
@ReleaseStrategy
public boolean canRelease(List<Message<?>> flights) {
LOGGER.debug("inside canRelease ::"+flights.size()); // This is called for each and every message
return compareTime(date.getTime(), new Date().getTime());
}
我是 SI 的新手 (v3.x),我搜索了很多时间限制相关的聚合器,找不到任何有用的资源,请建议。
谢谢!
打开 DEBUG 日志记录以查看为什么您只看到一条消息。
I suspect there should be something inbuilt with in SI, which can achieve this, ...
在 4.0 版之前(默认情况下,在 4.0 版之后),聚合器是一个完全被动的组件;只有在有新消息到达时才参考发布策略。
4.0 added group timeout capabilities 超时后可以释放(或丢弃)部分组。
但是,对于任何版本,您都可以配置 MessageGroupStoreReaper
以在超时后释放部分完整的组。参见 the documentation。
private String correlationId = date.toString();
@CorrelationStrategy
public String correlateBy(Message<?> message) {
**// Return the correlation ID which is the timestamp the current window started (all messages should have the same correlation id)**
return "same";
}
之前我返回的是 Header Id,这与 Message to Message 不同。我希望这个解决方案可以帮助一些人。忽略这么小的一个概念,我浪费了将近2天。
在进入出站通道之前,下面的代码正在接受 2 条消息。
<bean id="timeout"
class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
<constructor-arg name="threshold" value="2" />
<constructor-arg name="timeout" value="7000" />
</bean>
<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput"
method="handleMessage" release-strategy="releaseStrategyBean" release-strategy-method="timeout">
</int:aggregator>
我的用例是在 10 分钟内整理所有消息并将其发送到出站通道。不是基于消息的数量,如上所示。 要实现此基于时间的功能,请使用以下代码:
<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput"
method="handleMessage"
output-channel="outputappendFilenameinHeader" >
</int:aggregator>
<bean id="updateCreate" class="helper.UpdateCreateHelper"/>
我传递了 10 条消息,PojoDateStrategyHelper canRelease 方法被调用了 10 次。
尝试使用时差逻辑实现 PojoDateStrategyHelper,它按预期工作。 10 分钟后 UpdateCreateHelper class 被调用,但它只收到 1 条消息(最后一条消息)。其余 9 条消息在任何地方都看不到。我在这里做错了什么吗?消息没有整理。
我怀疑 SI 中应该有一些内置的东西可以实现这一点,如果我将 10 分钟作为参数传递,一旦 10 分钟时间到期,它应该将所有消息传递到出站通道。
这是我的 UpdateCreateHelper.java 代码:
public Message<?> handleMessage(List<Message<?>> flights){
LOGGER.debug("orderItems list ::"+flights.size()); // this is always printing 1
MessageBuilder<?> messageWithHeader = MessageBuilder.withPayload(flights.get(0).getPayload().toString());
messageWithHeader.setHeader("ftp_filename", "");
return messageWithHeader.build();
}
@CorrelationStrategy
public String correlateBy(@Header("id") String id) {
return id;
}
@ReleaseStrategy
public boolean canRelease(List<Message<?>> flights) {
LOGGER.debug("inside canRelease ::"+flights.size()); // This is called for each and every message
return compareTime(date.getTime(), new Date().getTime());
}
我是 SI 的新手 (v3.x),我搜索了很多时间限制相关的聚合器,找不到任何有用的资源,请建议。
谢谢!
打开 DEBUG 日志记录以查看为什么您只看到一条消息。
I suspect there should be something inbuilt with in SI, which can achieve this, ...
在 4.0 版之前(默认情况下,在 4.0 版之后),聚合器是一个完全被动的组件;只有在有新消息到达时才参考发布策略。
4.0 added group timeout capabilities 超时后可以释放(或丢弃)部分组。
但是,对于任何版本,您都可以配置 MessageGroupStoreReaper
以在超时后释放部分完整的组。参见 the documentation。
private String correlationId = date.toString();
@CorrelationStrategy
public String correlateBy(Message<?> message) {
**// Return the correlation ID which is the timestamp the current window started (all messages should have the same correlation id)**
return "same";
}
之前我返回的是 Header Id,这与 Message to Message 不同。我希望这个解决方案可以帮助一些人。忽略这么小的一个概念,我浪费了将近2天。