如何在 Dataflow Java SDK 中压缩输出文件?

How to compress output file in Dataflow Java SDK?

我的管道将输出数据文件存储到 GCS。 我想压缩这个文件。 TextIO 解压缩了压缩文件, 但我猜它没有压缩文件。 如何压缩输出文件?

TextIO只支持读取压缩文件。不支持写压缩文件

https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files

TextIO does not currently support writing to compressed files.

更多信息:

目前这是 DataFlow 的开放 feature request,但是这项工作已经在 Beam 中完成。一旦 Dataflow 2.0 发布(将基于 Beam),这应该得到官方支持。

也就是说,我已经能够通过扩展 FileBasedSink class 并利用 Jeff Payne 在 Beam 中的此功能所做的工作来编写压缩的 GZIP 文件。

public class GZIPSink<T> extends FileBasedSink<T>  {
    private final Coder<T> coder;

    GZIPSink(String baseOutputFilename, Coder<T> coder) {
        super(baseOutputFilename, ".gz");
        this.coder = coder;
    }

    @Override
    public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
        return new GZIPWriteOperation(this, coder);
    }

    static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
        private final Coder<T> coder;

        private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
            super(sink);
            this.coder = coder;
        }

        @Override
        public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
            return new GZIPBasedWriter(this, coder);
        }
    }

    static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
        private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
        private final Coder<T> coder;
        private GZIPOutputStream out;

        public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
            super(writeOperation);
            this.mimeType = MimeTypes.BINARY;
            this.coder = coder;
        }

        @Override
        protected void prepareWrite(WritableByteChannel channel) throws Exception {
            out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
                def.setLevel(def.BEST_COMPRESSION);
            }};
        }

        @Override
        public void write(T value) throws Exception {
            coder.encode(value, out, Coder.Context.OUTER);
            out.write(NEWLINE);
        }

        @Override
        public void writeFooter() throws IOException {
            out.finish();
        }
    }
}     

然后实际写入:

aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));

正如 Thang 所提到的,现在可以通过添加 .withCompression(Compression.GZIP):

在 beam sdk 版本 2 中实现
// Without Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"));

// With Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"))
      .withSuffix(".txt")
      .withCompression(Compression.GZIP));

可以找到给出的完整示例in the docs