Spring Integration:Is 有办法从频道中的 "all" 消息中聚合吗?

Spring Integration:Is there a way to aggregate from "all" messages in a channel?

我想写一个批处理,每天从一个路径读取网站访问日志文件(csv 文件),并使用 spring 集成进行一些分析。

这是输入 csv 文件的简化版本。

srcIp1,t1,path1
srcIp2,t2,path2
srcIp1,t3,path2
srcIp1,t4,path1

每个源ip和路径的访问数要经过一些过滤逻辑计算。

我制作了一个输入通道,其有效负载是已解析的日志行,并应用了一个过滤器,最后是一个聚合器来计算最终结果。 问题是正确的组释放策略应该是什么,默认的释放策略(SequenceSizeReleaseStrategy)不起作用。

还有任何其他 spring 集成开箱即用版本

strategies(ExpressionEvaluatingReleaseStrategy, MessageCountReleaseStrategy, MethodInvokingReleaseStrategy, SequenceSizeReleaseStrategy, TimeoutCountSequenceSizeReleaseStrategy)

似乎不​​符合我的需求。

或 Spring 集成假定通道承载消息流,其中没有 "ending of message" 的概念并且不适合我这里的问题?

如果您可以通过某种方式判断组何时完成,则可以编写自定义 ReleaseStrategy。每次向群组添加消息时都会查阅它。

或者,您可以使用 group-timeout 在一段时间后没有消息到达时释放部分群组。