spring 集成 - 如何将分割线聚合成 x 批次?
spring integration - how to aggregate split lines into batch of x?
我们需要将事件发送到 kinesis,由于 aws 定价,我们计划将记录分批放入 kinesis。
我们读入一个csv文件,然后使用文件拆分器将行吐出并将每一行转换为json。
那么在转换为 json 之后,我们如何将这些行分批为每批 25 行,以便我们的 kinesis serviceActivator 可以发送批次?
任何示例将不胜感激。
<int-file:splitter id="fileLineSplitter"
input-channel="fileInputChannel"
output-channel="splitterOutputChannel"
markers="true" />
<int:transformer id="csvToDataCdrTransformer"
ref="dataCdrLineTransformer"
method="transform"
input-channel="lineOutputChannel"
output-channel="dataCdrObjectInputChannel">
</int:transformer>
<int:object-to-json-transformer input-channel="dataCdrObjectInputChannel"
output-channel="kinesisSendChannel">
<int:poller fixed-delay="50"/>
</int:object-to-json-transformer>
编辑:我按照 "Artem Bilan" 的建议添加并且有效
<int:aggregator input-channel="aggregateChannel"
output-channel="toJsonChannel"
release-strategy-expression="#this.size() eq 2"
expire-groups-upon-completion="true"/>
但我收到错误:
我正在使用 markers="true" 以便我们知道文件的结尾,因此我们可以将其重命名为“.done”。
在拆分器和转换器之间添加了一个路由器,当 FileMarker 为 END 时,它简单地路由到 "nullChannel" 或 "fileProcessedChannel",否则,拆分线进入默认输出-频道="lineOutputChannel"
<int:router ref="fileMarkerCustomRouter" inputchannel="splitterOutputChannel" default-output-channel="lineOutputChannel"/>
路由器代码如下所示
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Collection<MessageChannel> targetChannels = new ArrayList<MessageChannel>();
if (isPayloadTypeFileMarker(message)) {
FileSplitter.FileMarker payload = (FileSplitter.FileMarker) message.getPayload();
if (isStartOfFile(payload)) {
targetChannels.add(nullChannel);
} else if (isEndOfFile(payload)) {
targetChannels.add(fileProcessedChannel);
}
}
return targetChannels;
}
但出现此错误:
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
有什么想法吗?
为了这个目的,你肯定需要一个 <aggregator>
与 release-strategy-expression="25"
和 expire-groups-upon-completion="true"
让它在释放一个后为同一个 correlationKey
形成一个新的组。
不,确定你为什么需要 markers="true"
,但没有 <int-file:splitter>
填充适当的相关性 headers。因此,您甚至可以考虑仅依赖默认拆分和默认聚合。
此外,您应该考虑将聚合器的结果转换为 JSON。它发出一个 List<?>
。将整个列表序列化为 JSON 非常有效。另外,在发送到 Kinesis 之前,您可能需要再进行一次转换。
因此您的配置原型应该是这样的:
<int-file:splitter id="fileLineSplitter"
input-channel="fileInputChannel"
output-channel="splitterOutputChannel"/>
<int:transformer id="csvToDataCdrTransformer"
ref="dataCdrLineTransformer"
method="transform"
input-channel="lineOutputChannel"
output-channel="aggregateChannel">
</int:transformer>
<int:aggregator input-channel="aggregateChannel"
output-channel="toJsonChannel"
expire-groups-upon-completion="true" />
<int:object-to-json-transformer input-channel="toJsonChannel"
output-channel="kinesisSendChannel"/>
这样整个文件将被视为一个批处理。您拆分它,处理每一行,将它们聚合回列表,然后在发送到 Kinesis 之前转换为 JSON。
从这里我想请你提出一个 JIRA 来添加 ObjectToJsonTransformer.ResultType.BYTES
模式,以便更高效地使用基于 byte[]
的下游组件,例如 KinesisMessageHandler
。
我们需要将事件发送到 kinesis,由于 aws 定价,我们计划将记录分批放入 kinesis。
我们读入一个csv文件,然后使用文件拆分器将行吐出并将每一行转换为json。
那么在转换为 json 之后,我们如何将这些行分批为每批 25 行,以便我们的 kinesis serviceActivator 可以发送批次?
任何示例将不胜感激。
<int-file:splitter id="fileLineSplitter"
input-channel="fileInputChannel"
output-channel="splitterOutputChannel"
markers="true" />
<int:transformer id="csvToDataCdrTransformer"
ref="dataCdrLineTransformer"
method="transform"
input-channel="lineOutputChannel"
output-channel="dataCdrObjectInputChannel">
</int:transformer>
<int:object-to-json-transformer input-channel="dataCdrObjectInputChannel"
output-channel="kinesisSendChannel">
<int:poller fixed-delay="50"/>
</int:object-to-json-transformer>
编辑:我按照 "Artem Bilan" 的建议添加并且有效
<int:aggregator input-channel="aggregateChannel"
output-channel="toJsonChannel"
release-strategy-expression="#this.size() eq 2"
expire-groups-upon-completion="true"/>
但我收到错误:
我正在使用 markers="true" 以便我们知道文件的结尾,因此我们可以将其重命名为“.done”。
在拆分器和转换器之间添加了一个路由器,当 FileMarker 为 END 时,它简单地路由到 "nullChannel" 或 "fileProcessedChannel",否则,拆分线进入默认输出-频道="lineOutputChannel"
<int:router ref="fileMarkerCustomRouter" inputchannel="splitterOutputChannel" default-output-channel="lineOutputChannel"/>
路由器代码如下所示
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Collection<MessageChannel> targetChannels = new ArrayList<MessageChannel>();
if (isPayloadTypeFileMarker(message)) {
FileSplitter.FileMarker payload = (FileSplitter.FileMarker) message.getPayload();
if (isStartOfFile(payload)) {
targetChannels.add(nullChannel);
} else if (isEndOfFile(payload)) {
targetChannels.add(fileProcessedChannel);
}
}
return targetChannels;
}
但出现此错误:
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
有什么想法吗?
为了这个目的,你肯定需要一个 <aggregator>
与 release-strategy-expression="25"
和 expire-groups-upon-completion="true"
让它在释放一个后为同一个 correlationKey
形成一个新的组。
不,确定你为什么需要 markers="true"
,但没有 <int-file:splitter>
填充适当的相关性 headers。因此,您甚至可以考虑仅依赖默认拆分和默认聚合。
此外,您应该考虑将聚合器的结果转换为 JSON。它发出一个 List<?>
。将整个列表序列化为 JSON 非常有效。另外,在发送到 Kinesis 之前,您可能需要再进行一次转换。
因此您的配置原型应该是这样的:
<int-file:splitter id="fileLineSplitter"
input-channel="fileInputChannel"
output-channel="splitterOutputChannel"/>
<int:transformer id="csvToDataCdrTransformer"
ref="dataCdrLineTransformer"
method="transform"
input-channel="lineOutputChannel"
output-channel="aggregateChannel">
</int:transformer>
<int:aggregator input-channel="aggregateChannel"
output-channel="toJsonChannel"
expire-groups-upon-completion="true" />
<int:object-to-json-transformer input-channel="toJsonChannel"
output-channel="kinesisSendChannel"/>
这样整个文件将被视为一个批处理。您拆分它,处理每一行,将它们聚合回列表,然后在发送到 Kinesis 之前转换为 JSON。
从这里我想请你提出一个 JIRA 来添加 ObjectToJsonTransformer.ResultType.BYTES
模式,以便更高效地使用基于 byte[]
的下游组件,例如 KinesisMessageHandler
。