spring 集成 - 如何将分割线聚合成 x 批次?

spring integration - how to aggregate split lines into batch of x?

我们需要将事件发送到 kinesis,由于 aws 定价,我们计划将记录分批放入 kinesis。

我们读入一个csv文件,然后使用文件拆分器将行吐出并将每一行转换为json。

那么在转换为 json 之后,我们如何将这些行分批为每批 25 行,以便我们的 kinesis serviceActivator 可以发送批次?

任何示例将不胜感激。

    <int-file:splitter id="fileLineSplitter"
                       input-channel="fileInputChannel"
                       output-channel="splitterOutputChannel"
                       markers="true" />


<int:transformer id="csvToDataCdrTransformer"
                     ref="dataCdrLineTransformer"
                     method="transform"
                     input-channel="lineOutputChannel"
                     output-channel="dataCdrObjectInputChannel">
    </int:transformer>


    <int:object-to-json-transformer input-channel="dataCdrObjectInputChannel"
                                    output-channel="kinesisSendChannel">
        <int:poller fixed-delay="50"/>
    </int:object-to-json-transformer>

编辑:我按照 "Artem Bilan" 的建议添加并且有效

<int:aggregator input-channel="aggregateChannel"
                output-channel="toJsonChannel"
                release-strategy-expression="#this.size() eq 2"
                expire-groups-upon-completion="true"/>

但我收到错误:

  1. 我正在使用 markers="true" 以便我们知道文件的结尾,因此我们可以将其重命名为“.done”。

  2. 在拆分器和转换器之间添加了一个路由器,当 FileMarker 为 END 时,它简单地路由到 "nullChannel" 或 "fileProcessedChannel",否则,拆分线进入默认输出-频道="lineOutputChannel"

    <int:router ref="fileMarkerCustomRouter" inputchannel="splitterOutputChannel" default-output-channel="lineOutputChannel"/>
    

路由器代码如下所示

 @Override
    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
        Collection<MessageChannel> targetChannels = new ArrayList<MessageChannel>();

        if (isPayloadTypeFileMarker(message)) {

            FileSplitter.FileMarker payload = (FileSplitter.FileMarker) message.getPayload();

            if (isStartOfFile(payload)) {

                targetChannels.add(nullChannel);

            } else if (isEndOfFile(payload)) {

                targetChannels.add(fileProcessedChannel);
            }
        }
        return targetChannels;
    }

但出现此错误:

Caused by: java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?

有什么想法吗?

为了这个目的,你肯定需要一个 <aggregator>release-strategy-expression="25"expire-groups-upon-completion="true" 让它在释放一个后为同一个 correlationKey 形成一个新的组。

不,确定你为什么需要 markers="true",但没有 <int-file:splitter> 填充适当的相关性 headers。因此,您甚至可以考虑仅依赖默认拆分和默认聚合。

此外,您应该考虑将聚合器的结果转换为 JSON。它发出一个 List<?>。将整个列表序列化为 JSON 非常有效。另外,在发送到 Kinesis 之前,您可能需要再进行一次转换。

因此您的配置原型应该是这样的:

<int-file:splitter id="fileLineSplitter"
                   input-channel="fileInputChannel"
                   output-channel="splitterOutputChannel"/>

<int:transformer id="csvToDataCdrTransformer"
                 ref="dataCdrLineTransformer"
                 method="transform"
                 input-channel="lineOutputChannel"
                 output-channel="aggregateChannel">
</int:transformer>

<int:aggregator input-channel="aggregateChannel" 
                output-channel="toJsonChannel"
                expire-groups-upon-completion="true" />

<int:object-to-json-transformer input-channel="toJsonChannel"
                                output-channel="kinesisSendChannel"/>

这样整个文件将被视为一个批处理。您拆分它,处理每一行,将它们聚合回列表,然后在发送到 Kinesis 之前转换为 JSON。

从这里我想请你提出一个 JIRA 来添加 ObjectToJsonTransformer.ResultType.BYTES 模式,以便更高效地使用基于 byte[] 的下游组件,例如 KinesisMessageHandler