如何在 Dataflow Java SDK 中压缩输出文件?
How to compress output file in Dataflow Java SDK?
我的管道将输出数据文件存储到 GCS。
我想压缩这个文件。
TextIO 解压缩了压缩文件,
但我猜它没有压缩文件。
如何压缩输出文件?
TextIO
只支持读取压缩文件。不支持写压缩文件
https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files
TextIO does not currently support writing to compressed files.
更多信息:
目前这是 DataFlow 的开放 feature request,但是这项工作已经在 Beam 中完成。一旦 Dataflow 2.0 发布(将基于 Beam),这应该得到官方支持。
也就是说,我已经能够通过扩展 FileBasedSink class 并利用 Jeff Payne 在 Beam 中的此功能所做的工作来编写压缩的 GZIP 文件。
public class GZIPSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
GZIPSink(String baseOutputFilename, Coder<T> coder) {
super(baseOutputFilename, ".gz");
this.coder = coder;
}
@Override
public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
return new GZIPWriteOperation(this, coder);
}
static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
private final Coder<T> coder;
private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
super(sink);
this.coder = coder;
}
@Override
public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
return new GZIPBasedWriter(this, coder);
}
}
static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
private GZIPOutputStream out;
public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
super(writeOperation);
this.mimeType = MimeTypes.BINARY;
this.coder = coder;
}
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
def.setLevel(def.BEST_COMPRESSION);
}};
}
@Override
public void write(T value) throws Exception {
coder.encode(value, out, Coder.Context.OUTER);
out.write(NEWLINE);
}
@Override
public void writeFooter() throws IOException {
out.finish();
}
}
}
然后实际写入:
aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));
正如 Thang 所提到的,现在可以通过添加 .withCompression(Compression.GZIP)
:
在 beam sdk 版本 2 中实现
// Without Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"));
// With Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"))
.withSuffix(".txt")
.withCompression(Compression.GZIP));
可以找到给出的完整示例in the docs
我的管道将输出数据文件存储到 GCS。 我想压缩这个文件。 TextIO 解压缩了压缩文件, 但我猜它没有压缩文件。 如何压缩输出文件?
TextIO
只支持读取压缩文件。不支持写压缩文件
https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files
TextIO does not currently support writing to compressed files.
更多信息:
目前这是 DataFlow 的开放 feature request,但是这项工作已经在 Beam 中完成。一旦 Dataflow 2.0 发布(将基于 Beam),这应该得到官方支持。
也就是说,我已经能够通过扩展 FileBasedSink class 并利用 Jeff Payne 在 Beam 中的此功能所做的工作来编写压缩的 GZIP 文件。
public class GZIPSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
GZIPSink(String baseOutputFilename, Coder<T> coder) {
super(baseOutputFilename, ".gz");
this.coder = coder;
}
@Override
public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
return new GZIPWriteOperation(this, coder);
}
static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
private final Coder<T> coder;
private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
super(sink);
this.coder = coder;
}
@Override
public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
return new GZIPBasedWriter(this, coder);
}
}
static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
private GZIPOutputStream out;
public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
super(writeOperation);
this.mimeType = MimeTypes.BINARY;
this.coder = coder;
}
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
def.setLevel(def.BEST_COMPRESSION);
}};
}
@Override
public void write(T value) throws Exception {
coder.encode(value, out, Coder.Context.OUTER);
out.write(NEWLINE);
}
@Override
public void writeFooter() throws IOException {
out.finish();
}
}
}
然后实际写入:
aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));
正如 Thang 所提到的,现在可以通过添加 .withCompression(Compression.GZIP)
:
// Without Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"));
// With Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"))
.withSuffix(".txt")
.withCompression(Compression.GZIP));
可以找到给出的完整示例in the docs