Spring 集成每 10 秒聚合一次消息

Spring integration aggregate messages every 10 seconds

在我的一个流程中,我希望每 10 秒聚合一次,并将这些有效负载写入文件共享。我不是很清楚如何使用聚合器。

@Bean
public IntegrationFlow errorHandlingQueueFlow() {
    return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
            .bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))                
            .aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
            .transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
            .handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
            .get();
}

由于这是用于错误处理,因此只有少数出错的消息会进入此 ERROR_QUEUE_CHANNEL。所以我想每 10 秒收集一次,而不是等待收到来自一个组的所有消息。当我使用 grouptimeout 时,每 10 秒将收集到的所有消息发送到 nullchannel。

groupTimeout()的默认目的是清理过期的组。如果你想正常释放它们而不是丢弃它们,你应该考虑使用 sendPartialResultOnExpiry = true。当然,如果您在这些消息中确实有相关详细信息 headers,那么这一切都是有意义的。否则你需要考虑 correlationStrategy 来对这些错误消息进行分组。

请在文档中阅读有关聚合器及其选项的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator