Spring 集成聚合器节流器

Spring Integration Aggregator Throttler

我有一封邮件 SomeMessage 看起来像这样:

class SomeMessage{
 id,
 title
}

目前,我根据 id 聚合消息。消息在 10 秒后释放。

.aggregate(
            a ->
                a 
                .outputProcessor(messageProcessor())
                .messageStore(messageGroupStore())
                .correlationStrategy(correlationStrategy())
                .expireGroupsUponCompletion(true)
                .sendPartialResultOnExpiry(true)
                .groupTimeout(TimeUnit.SECONDS.toMillis(10)))
        .handle(amqpOutboundEndpoint)

我需要的是一种基于 title 属性 限制消息的方法。如果title=="A",聚合还是要等待10秒;如果 title=="B" 它应该等待 60 秒进行聚合并且它不应该立即发送到 amqpOutboundEndpoint 但它应该有一些限制(例如每条消息之间有 30 秒 title=="B")。

最好的方法是什么?在 AmqpOutboundEndpoint 上有类似节流的东西吗?

更新

.groupTimeout(messageGroup -> {
                      if(anyMessageInGroupHasTitleB(messageGroup)){
                        return TimeUnit.SECONDS.toMillis(60);
                      }
                      else {
                        return TimeUnit.SECONDS.toMillis(10);
                      }
                    }))
        .route(
            (Function<SomeMessage, Boolean>) ec ->
            ec.getTitle().equals("B"),
        m -> m.subFlowMapping(true, sf ->
            sf.channel(channels -> channels.queue(1))
                .bridge(e -> e.poller(Pollers
                    .fixedDelay(60, TimeUnit.SECONDS)
                    .maxMessagesPerPoll(1)
                ))
        ).subFlowMapping(false, IntegrationFlowDefinition::bridge))
    .handle(amqpOutboundEndpoint)

使用 groupTimeoutExpression() 而不是固定超时...

payload.title == 'A' ? 10000 : 30000