Spring 集成聚合器节流器
Spring Integration Aggregator Throttler
我有一封邮件 SomeMessage
看起来像这样:
class SomeMessage{
id,
title
}
目前,我根据 id 聚合消息。消息在 10 秒后释放。
.aggregate(
a ->
a
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10)))
.handle(amqpOutboundEndpoint)
我需要的是一种基于 title
属性 限制消息的方法。如果title=="A"
,聚合还是要等待10秒;如果 title=="B"
它应该等待 60 秒进行聚合并且它不应该立即发送到 amqpOutboundEndpoint
但它应该有一些限制(例如每条消息之间有 30 秒 title=="B"
)。
最好的方法是什么?在 AmqpOutboundEndpoint
上有类似节流的东西吗?
更新
.groupTimeout(messageGroup -> {
if(anyMessageInGroupHasTitleB(messageGroup)){
return TimeUnit.SECONDS.toMillis(60);
}
else {
return TimeUnit.SECONDS.toMillis(10);
}
}))
.route(
(Function<SomeMessage, Boolean>) ec ->
ec.getTitle().equals("B"),
m -> m.subFlowMapping(true, sf ->
sf.channel(channels -> channels.queue(1))
.bridge(e -> e.poller(Pollers
.fixedDelay(60, TimeUnit.SECONDS)
.maxMessagesPerPoll(1)
))
).subFlowMapping(false, IntegrationFlowDefinition::bridge))
.handle(amqpOutboundEndpoint)
使用 groupTimeoutExpression()
而不是固定超时...
payload.title == 'A' ? 10000 : 30000
我有一封邮件 SomeMessage
看起来像这样:
class SomeMessage{
id,
title
}
目前,我根据 id 聚合消息。消息在 10 秒后释放。
.aggregate(
a ->
a
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10)))
.handle(amqpOutboundEndpoint)
我需要的是一种基于 title
属性 限制消息的方法。如果title=="A"
,聚合还是要等待10秒;如果 title=="B"
它应该等待 60 秒进行聚合并且它不应该立即发送到 amqpOutboundEndpoint
但它应该有一些限制(例如每条消息之间有 30 秒 title=="B"
)。
最好的方法是什么?在 AmqpOutboundEndpoint
上有类似节流的东西吗?
更新
.groupTimeout(messageGroup -> {
if(anyMessageInGroupHasTitleB(messageGroup)){
return TimeUnit.SECONDS.toMillis(60);
}
else {
return TimeUnit.SECONDS.toMillis(10);
}
}))
.route(
(Function<SomeMessage, Boolean>) ec ->
ec.getTitle().equals("B"),
m -> m.subFlowMapping(true, sf ->
sf.channel(channels -> channels.queue(1))
.bridge(e -> e.poller(Pollers
.fixedDelay(60, TimeUnit.SECONDS)
.maxMessagesPerPoll(1)
))
).subFlowMapping(false, IntegrationFlowDefinition::bridge))
.handle(amqpOutboundEndpoint)
使用 groupTimeoutExpression()
而不是固定超时...
payload.title == 'A' ? 10000 : 30000