JSON 从 Flux 到 S3 的序列化中的内存高效元素处理

Memory-Efficient Element Handling in JSON Serialisation from Flux to S3

我正在从 Flux 向 S3 写入一个大型 JSON 数组。一次收集和序列化整个对象流对内存有很大的影响。这导致我将其重新解释为多部分上传,代码大致如下所示:

results
    .map(this::serialize)
    .map(
         bytes ->
             uploadBytes(
                         bytes,
                         filename,
                         bucket,
                         index.getAndIncrement(),
                         uploadId))

这意味着在任何给定时间,只有 results 的单个元素需要在内存中序列化。这表面上有效,但不会产生有效的 JSON,因为组合文件不是逗号分隔或用方括号括起来的。

我们可以添加额外的逻辑来检查上传的索引,以便第一个元素前置一个 [,并且每个其他元素前置一个 ,。给出结构:

[result1 + ,result2 + ... + ,resultX + ...

可能由以下代码决定:

ByteArrayOutputStream output = new ByteArrayOutputStream();
if (index == 1) {
    output.write('[');
} else {
    output.write(',');
}
output.write(bytes);

这个策略仍然省略了最后一个括号,因为我们不知道当前元素是否是最后一个元素。 S3 文件部分的最小大小也为 5mb。最坏的情况是最终上传 ] 并填充 5mb 的空白。

是否有一种惯用的方法来确定 Flux 中的任何给定元素是否是最后一个,并且是否直接跟随一个完整的信号?

为了实现这一点,我简单地写了一些大意为:

results
    .map(this::serialize)
    .concatWithValues((byte) ']')

然后将这些以 5MB 的缓冲区(而不是每个元素)上传到 S3。因此,当 5MB 缓冲区已满时,可以延迟序列化元素。