JSON 从 Flux 到 S3 的序列化中的内存高效元素处理
Memory-Efficient Element Handling in JSON Serialisation from Flux to S3
我正在从 Flux
向 S3 写入一个大型 JSON 数组。一次收集和序列化整个对象流对内存有很大的影响。这导致我将其重新解释为多部分上传,代码大致如下所示:
results
.map(this::serialize)
.map(
bytes ->
uploadBytes(
bytes,
filename,
bucket,
index.getAndIncrement(),
uploadId))
这意味着在任何给定时间,只有 results
的单个元素需要在内存中序列化。这表面上有效,但不会产生有效的 JSON,因为组合文件不是逗号分隔或用方括号括起来的。
我们可以添加额外的逻辑来检查上传的索引,以便第一个元素前置一个 [
,并且每个其他元素前置一个 ,
。给出结构:
[result1
+ ,result2
+ ... + ,resultX
+ ...
可能由以下代码决定:
ByteArrayOutputStream output = new ByteArrayOutputStream();
if (index == 1) {
output.write('[');
} else {
output.write(',');
}
output.write(bytes);
这个策略仍然省略了最后一个括号,因为我们不知道当前元素是否是最后一个元素。 S3 文件部分的最小大小也为 5mb。最坏的情况是最终上传 ]
并填充 5mb 的空白。
是否有一种惯用的方法来确定 Flux
中的任何给定元素是否是最后一个,并且是否直接跟随一个完整的信号?
为了实现这一点,我简单地写了一些大意为:
results
.map(this::serialize)
.concatWithValues((byte) ']')
然后将这些以 5MB 的缓冲区(而不是每个元素)上传到 S3。因此,当 5MB 缓冲区已满时,可以延迟序列化元素。
我正在从 Flux
向 S3 写入一个大型 JSON 数组。一次收集和序列化整个对象流对内存有很大的影响。这导致我将其重新解释为多部分上传,代码大致如下所示:
results
.map(this::serialize)
.map(
bytes ->
uploadBytes(
bytes,
filename,
bucket,
index.getAndIncrement(),
uploadId))
这意味着在任何给定时间,只有 results
的单个元素需要在内存中序列化。这表面上有效,但不会产生有效的 JSON,因为组合文件不是逗号分隔或用方括号括起来的。
我们可以添加额外的逻辑来检查上传的索引,以便第一个元素前置一个 [
,并且每个其他元素前置一个 ,
。给出结构:
[result1
+ ,result2
+ ... + ,resultX
+ ...
可能由以下代码决定:
ByteArrayOutputStream output = new ByteArrayOutputStream();
if (index == 1) {
output.write('[');
} else {
output.write(',');
}
output.write(bytes);
这个策略仍然省略了最后一个括号,因为我们不知道当前元素是否是最后一个元素。 S3 文件部分的最小大小也为 5mb。最坏的情况是最终上传 ]
并填充 5mb 的空白。
是否有一种惯用的方法来确定 Flux
中的任何给定元素是否是最后一个,并且是否直接跟随一个完整的信号?
为了实现这一点,我简单地写了一些大意为:
results
.map(this::serialize)
.concatWithValues((byte) ']')
然后将这些以 5MB 的缓冲区(而不是每个元素)上传到 S3。因此,当 5MB 缓冲区已满时,可以延迟序列化元素。