Google 数据流:ZipInputStream 的编码器
Google Dataflow: Coder for ZipInputStream
我正在尝试将一些文件(它们本身包含 zip 文件)从 google 存储解压缩到 google 存储。
因此,我有以下 DoFn 来收集 ZipInputStreams:
static class UnzipFilesFN extends DoFn<GcsPath,ZipInputStream>{
private static final long serialVersionUID = 7373250969860890761L;
public void processElement(ProcessContext c){
GcsPath p = c.element();
try{
ZipInputStream zis = new ZipInputStream(new FileInputStream(p.toString()));
c.output(zis);
}
catch (FileNotFoundException fnfe){
//
}
}
}
以及以下自定义接收器来执行解压缩和写入部分:
public static class ZipIO{
public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<ZipInputStream> {
private static final long serialVersionUID = -7414200726778377175L;
final String unzipTarget;
public Sink withDestinationPath(String s){
if(s!=""){
return new Sink(s);
}
else {
throw new IllegalArgumentException("must assign destination path");
}
}
protected Sink(String path){
this.unzipTarget = path;
}
@Override
public void validate(PipelineOptions po){
if(unzipTarget==null){
throw new RuntimeException();
}
}
@Override
public ZipFileWriteOperation createWriteOperation(PipelineOptions po){
return new ZipFileWriteOperation(this);
}
}
private static class ZipFileWriteOperation extends WriteOperation<ZipInputStream, UnzipResult>{
private static final long serialVersionUID = 7976541367499831605L;
private final ZipIO.Sink sink;
public ZipFileWriteOperation(ZipIO.Sink sink){
this.sink = sink;
}
@Override
public void initialize(PipelineOptions po) throws Exception{
}
@Override
public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception {
long totalFiles = 0;
for(UnzipResult r:writerResults){
totalFiles +=r.filesUnziped;
}
LOG.info("Unzipped {} Files",totalFiles);
}
@Override
public ZipIO.Sink getSink(){
return sink;
}
@Override
public ZipWriter createWriter(PipelineOptions po) throws Exception{
return new ZipWriter(this);
}
}
private static class ZipWriter extends Writer<ZipInputStream, UnzipResult>{
private final ZipFileWriteOperation writeOp;
private long totalUnzipped = 0;
ZipWriter(ZipFileWriteOperation writeOp){
this.writeOp = writeOp;
}
@Override
public void open(String uID) throws Exception{
}
@Override
public void write(ZipInputStream zis){
byte[] buffer = new byte[1024];
try{
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
File f = new File(writeOp.sink.unzipTarget + "/" + ze.getName());
FileOutputStream fos = new FileOutputStream(f);
int len;
while((len=zis.read(buffer))>0){
fos.write(buffer, 0, len);
}
fos.close();
this.totalUnzipped++;
}
zis.closeEntry();
zis.close();
}
catch(Exception e){
//
}
}
@Override
public UnzipResult close() throws Exception{
return new UnzipResult(this.totalUnzipped);
}
@Override
public ZipFileWriteOperation getWriteOperation(){
return writeOp;
}
}
private static class UnzipResult implements Serializable{
private static final long serialVersionUID = -8504626439217544799L;
final long filesUnziped;
public UnzipResult(long filesUnziped){
this.filesUnziped=filesUnziped;
}
}
}
}
当我尝试 运行 管道时,出现了一些错误:
Building a Coder from the fallback CoderProvider failed: Cannot provide coder for type java.util.zip.ZipInputStream: com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder@5717c37 could not provide a Coder for type java.util.zip.ZipInputStream: Cannot provide ProtoCoder because java.util.zip.ZipInputStream is not a subclass of com.google.protobuf.Message; com.google.cloud.dataflow.sdk.coders.SerializableCoder@68f4865 could not provide a Coder for type java.util.zip.ZipInputStream: Cannot provide SerializableCoder because java.util.zip.ZipInputStream does not implement Serializable.
at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195)
at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:332)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174)
我需要分配哪个编码器来处理 ZipInputStreams?
谢谢&BR
菲利普
编码器是必要的,这样运行者就可以将 PCollection
实体化到临时存储并读回,而不是将其保存在内存中。我想不出一种合理的方法来具体化 ZipInputStream
对象 - 这是一个基本的概念问题,而不是 Coder
API 问题。
但是,在您的特定情况下,我认为您可以简单地在 ZipWriter.write()
函数中打开 ZipInputStream
,并使 ZipIO.Sink
成为 Sink<GcsPath>
而不是 Sink<ZipInputStream>
.
我在您的代码中注意到的另一件事:我想您计划将此代码用于位于 GCS 上的文件和 Cloud Dataflow 运行器,而不仅仅是内存运行器和本地文件。在那种情况下,java.io.File
不会透明地处理 reading/writing 到 GCS - 你需要为此使用 GcsUtil。
我正在尝试将一些文件(它们本身包含 zip 文件)从 google 存储解压缩到 google 存储。
因此,我有以下 DoFn 来收集 ZipInputStreams:
static class UnzipFilesFN extends DoFn<GcsPath,ZipInputStream>{
private static final long serialVersionUID = 7373250969860890761L;
public void processElement(ProcessContext c){
GcsPath p = c.element();
try{
ZipInputStream zis = new ZipInputStream(new FileInputStream(p.toString()));
c.output(zis);
}
catch (FileNotFoundException fnfe){
//
}
}
}
以及以下自定义接收器来执行解压缩和写入部分:
public static class ZipIO{
public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<ZipInputStream> {
private static final long serialVersionUID = -7414200726778377175L;
final String unzipTarget;
public Sink withDestinationPath(String s){
if(s!=""){
return new Sink(s);
}
else {
throw new IllegalArgumentException("must assign destination path");
}
}
protected Sink(String path){
this.unzipTarget = path;
}
@Override
public void validate(PipelineOptions po){
if(unzipTarget==null){
throw new RuntimeException();
}
}
@Override
public ZipFileWriteOperation createWriteOperation(PipelineOptions po){
return new ZipFileWriteOperation(this);
}
}
private static class ZipFileWriteOperation extends WriteOperation<ZipInputStream, UnzipResult>{
private static final long serialVersionUID = 7976541367499831605L;
private final ZipIO.Sink sink;
public ZipFileWriteOperation(ZipIO.Sink sink){
this.sink = sink;
}
@Override
public void initialize(PipelineOptions po) throws Exception{
}
@Override
public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception {
long totalFiles = 0;
for(UnzipResult r:writerResults){
totalFiles +=r.filesUnziped;
}
LOG.info("Unzipped {} Files",totalFiles);
}
@Override
public ZipIO.Sink getSink(){
return sink;
}
@Override
public ZipWriter createWriter(PipelineOptions po) throws Exception{
return new ZipWriter(this);
}
}
private static class ZipWriter extends Writer<ZipInputStream, UnzipResult>{
private final ZipFileWriteOperation writeOp;
private long totalUnzipped = 0;
ZipWriter(ZipFileWriteOperation writeOp){
this.writeOp = writeOp;
}
@Override
public void open(String uID) throws Exception{
}
@Override
public void write(ZipInputStream zis){
byte[] buffer = new byte[1024];
try{
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
File f = new File(writeOp.sink.unzipTarget + "/" + ze.getName());
FileOutputStream fos = new FileOutputStream(f);
int len;
while((len=zis.read(buffer))>0){
fos.write(buffer, 0, len);
}
fos.close();
this.totalUnzipped++;
}
zis.closeEntry();
zis.close();
}
catch(Exception e){
//
}
}
@Override
public UnzipResult close() throws Exception{
return new UnzipResult(this.totalUnzipped);
}
@Override
public ZipFileWriteOperation getWriteOperation(){
return writeOp;
}
}
private static class UnzipResult implements Serializable{
private static final long serialVersionUID = -8504626439217544799L;
final long filesUnziped;
public UnzipResult(long filesUnziped){
this.filesUnziped=filesUnziped;
}
}
}
}
当我尝试 运行 管道时,出现了一些错误:
Building a Coder from the fallback CoderProvider failed: Cannot provide coder for type java.util.zip.ZipInputStream: com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder@5717c37 could not provide a Coder for type java.util.zip.ZipInputStream: Cannot provide ProtoCoder because java.util.zip.ZipInputStream is not a subclass of com.google.protobuf.Message; com.google.cloud.dataflow.sdk.coders.SerializableCoder@68f4865 could not provide a Coder for type java.util.zip.ZipInputStream: Cannot provide SerializableCoder because java.util.zip.ZipInputStream does not implement Serializable. at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:332) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174)
我需要分配哪个编码器来处理 ZipInputStreams?
谢谢&BR 菲利普
编码器是必要的,这样运行者就可以将 PCollection
实体化到临时存储并读回,而不是将其保存在内存中。我想不出一种合理的方法来具体化 ZipInputStream
对象 - 这是一个基本的概念问题,而不是 Coder
API 问题。
但是,在您的特定情况下,我认为您可以简单地在 ZipWriter.write()
函数中打开 ZipInputStream
,并使 ZipIO.Sink
成为 Sink<GcsPath>
而不是 Sink<ZipInputStream>
.
我在您的代码中注意到的另一件事:我想您计划将此代码用于位于 GCS 上的文件和 Cloud Dataflow 运行器,而不仅仅是内存运行器和本地文件。在那种情况下,java.io.File
不会透明地处理 reading/writing 到 GCS - 你需要为此使用 GcsUtil。