Spring 集成每 10 秒聚合一次消息
Spring integration aggregate messages every 10 seconds
在我的一个流程中,我希望每 10 秒聚合一次,并将这些有效负载写入文件共享。我不是很清楚如何使用聚合器。
@Bean
public IntegrationFlow errorHandlingQueueFlow() {
return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
.bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))
.aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
.transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
.handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
.get();
}
由于这是用于错误处理,因此只有少数出错的消息会进入此 ERROR_QUEUE_CHANNEL。所以我想每 10 秒收集一次,而不是等待收到来自一个组的所有消息。当我使用 grouptimeout 时,每 10 秒将收集到的所有消息发送到 nullchannel。
groupTimeout()
的默认目的是清理过期的组。如果你想正常释放它们而不是丢弃它们,你应该考虑使用 sendPartialResultOnExpiry = true
。当然,如果您在这些消息中确实有相关详细信息 headers,那么这一切都是有意义的。否则你需要考虑 correlationStrategy
来对这些错误消息进行分组。
请在文档中阅读有关聚合器及其选项的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
在我的一个流程中,我希望每 10 秒聚合一次,并将这些有效负载写入文件共享。我不是很清楚如何使用聚合器。
@Bean
public IntegrationFlow errorHandlingQueueFlow() {
return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
.bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))
.aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
.transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
.handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
.get();
}
由于这是用于错误处理,因此只有少数出错的消息会进入此 ERROR_QUEUE_CHANNEL。所以我想每 10 秒收集一次,而不是等待收到来自一个组的所有消息。当我使用 grouptimeout 时,每 10 秒将收集到的所有消息发送到 nullchannel。
groupTimeout()
的默认目的是清理过期的组。如果你想正常释放它们而不是丢弃它们,你应该考虑使用 sendPartialResultOnExpiry = true
。当然,如果您在这些消息中确实有相关详细信息 headers,那么这一切都是有意义的。否则你需要考虑 correlationStrategy
来对这些错误消息进行分组。
请在文档中阅读有关聚合器及其选项的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator