Google 数据流:请求负载大小超出限制:10485760 字节

Google Dataflow: Request payload size exceeds the limit: 10485760 bytes

当尝试 运行 对 ~ 800.000 个文件进行大型转换时,我在尝试 运行 管道时收到上述错误消息。

代码如下:

public static void main(String[] args) {
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());    
    GcsUtil u = getUtil(p.getOptions());

    try{
        List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip"));
        List<String> strPaths = new ArrayList<String>();
        for(GcsPath pa: paths){
            strPaths.add(pa.toUri().toString());
        }           

        p.apply(Create.of(strPaths))
         .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
        p.run();
    }
    catch(IOException io){
        //
    }

}

我认为这正是 google 数据流的用途?处理大量文件/数据?

有没有办法拆分负载以使其工作?

谢谢 & BR

菲尔

Dataflow 擅长处理大量数据,但在管道描述 的大小方面存在局限性。传递给 Create.of() 的数据当前嵌入在管道描述中,因此您不能在那里传递大量数据 - 相反,应从外部存储读取大量数据,并且管道应仅指定它们的位置.

将其视为程序可以处理的数据量与程序代码本身的大小之间的区别。

您可以通过在 ParDo:

中进行扩展来解决这个问题
p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
 .apply(ParDo.of(new ExpandFn()))
 .apply(...fusion break (see below)...)
 .apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")))

其中 ExpandFn 如下所示:

private static class ExpandFn extends DoFn<String, String> {
  @ProcessElement
  public void process(ProcessContext c) {
    GcsUtil util = getUtil(c.getPipelineOptions());
    for (String path : util.expand(GcsPath.fromUri(c.element()))) {
      c.output(path);
    }
  }
}

融合断裂我指的是this (basically, ParDo(add unique key) + group by key + Flatten.iterables() + Values.create()). It's not very convenient and there are discussions happening about adding a built-in transform to do this (see this PR and this thread)。

非常感谢!使用您的输入,我这样解决了它:

public class ZipPipeline {
private static final Logger LOG = LoggerFactory.getLogger(ZipPipeline.class);

public static void main(String[] args) {
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());    

    try{
        p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
         .apply(ParDo.of(new ExpandFN()))
         .apply(ParDo.of(new AddKeyFN()))
         .apply(GroupByKey.<String,String>create())
         .apply(ParDo.of(new FlattenFN()))
         .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
        p.run();

    }
    catch(Exception e){
        LOG.error(e.getMessage());
    }

}

private static class FlattenFN extends DoFn<KV<String,Iterable<String>>, String>{
  private static final long serialVersionUID = 1L;
  @Override
  public void processElement(ProcessContext c){
      KV<String,Iterable<String>> kv = c.element();
      for(String s: kv.getValue()){
          c.output(s);
      }


      }

  }

private static class ExpandFN extends DoFn<String,String>{
private static final long serialVersionUID = 1L;

@Override
  public void processElement(ProcessContext c) throws Exception{
      GcsUtil u = getUtil(c.getPipelineOptions());
      for(GcsPath path : u.expand(GcsPath.fromUri(c.element()))){
          c.output(path.toUri().toString());
      }
  }
}

private static class AddKeyFN extends DoFn<String, KV<String,String>>{
  private static final long serialVersionUID = 1L;
  @Override
  public void processElement(ProcessContext c){
     String path = c.element();
     String monthKey = path.split("_")[4].substring(0, 6);
     c.output(KV.of(monthKey, path));
  }
}