Google 数据流:请求负载大小超出限制:10485760 字节
Google Dataflow: Request payload size exceeds the limit: 10485760 bytes
当尝试 运行 对 ~ 800.000 个文件进行大型转换时,我在尝试 运行 管道时收到上述错误消息。
代码如下:
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
GcsUtil u = getUtil(p.getOptions());
try{
List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip"));
List<String> strPaths = new ArrayList<String>();
for(GcsPath pa: paths){
strPaths.add(pa.toUri().toString());
}
p.apply(Create.of(strPaths))
.apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
p.run();
}
catch(IOException io){
//
}
}
我认为这正是 google 数据流的用途?处理大量文件/数据?
有没有办法拆分负载以使其工作?
谢谢 & BR
菲尔
Dataflow 擅长处理大量数据,但在管道描述 的大小方面存在局限性。传递给 Create.of()
的数据当前嵌入在管道描述中,因此您不能在那里传递大量数据 - 相反,应从外部存储读取大量数据,并且管道应仅指定它们的位置.
将其视为程序可以处理的数据量与程序代码本身的大小之间的区别。
您可以通过在 ParDo
:
中进行扩展来解决这个问题
p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
.apply(ParDo.of(new ExpandFn()))
.apply(...fusion break (see below)...)
.apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")))
其中 ExpandFn
如下所示:
private static class ExpandFn extends DoFn<String, String> {
@ProcessElement
public void process(ProcessContext c) {
GcsUtil util = getUtil(c.getPipelineOptions());
for (String path : util.expand(GcsPath.fromUri(c.element()))) {
c.output(path);
}
}
}
和融合断裂我指的是this (basically, ParDo(add unique key)
+ group by key
+ Flatten.iterables()
+ Values.create()
). It's not very convenient and there are discussions happening about adding a built-in transform to do this (see this PR and this thread)。
非常感谢!使用您的输入,我这样解决了它:
public class ZipPipeline {
private static final Logger LOG = LoggerFactory.getLogger(ZipPipeline.class);
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
try{
p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
.apply(ParDo.of(new ExpandFN()))
.apply(ParDo.of(new AddKeyFN()))
.apply(GroupByKey.<String,String>create())
.apply(ParDo.of(new FlattenFN()))
.apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
p.run();
}
catch(Exception e){
LOG.error(e.getMessage());
}
}
private static class FlattenFN extends DoFn<KV<String,Iterable<String>>, String>{
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c){
KV<String,Iterable<String>> kv = c.element();
for(String s: kv.getValue()){
c.output(s);
}
}
}
private static class ExpandFN extends DoFn<String,String>{
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception{
GcsUtil u = getUtil(c.getPipelineOptions());
for(GcsPath path : u.expand(GcsPath.fromUri(c.element()))){
c.output(path.toUri().toString());
}
}
}
private static class AddKeyFN extends DoFn<String, KV<String,String>>{
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c){
String path = c.element();
String monthKey = path.split("_")[4].substring(0, 6);
c.output(KV.of(monthKey, path));
}
}
当尝试 运行 对 ~ 800.000 个文件进行大型转换时,我在尝试 运行 管道时收到上述错误消息。
代码如下:
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
GcsUtil u = getUtil(p.getOptions());
try{
List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip"));
List<String> strPaths = new ArrayList<String>();
for(GcsPath pa: paths){
strPaths.add(pa.toUri().toString());
}
p.apply(Create.of(strPaths))
.apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
p.run();
}
catch(IOException io){
//
}
}
我认为这正是 google 数据流的用途?处理大量文件/数据?
有没有办法拆分负载以使其工作?
谢谢 & BR
菲尔
Dataflow 擅长处理大量数据,但在管道描述 的大小方面存在局限性。传递给 Create.of()
的数据当前嵌入在管道描述中,因此您不能在那里传递大量数据 - 相反,应从外部存储读取大量数据,并且管道应仅指定它们的位置.
将其视为程序可以处理的数据量与程序代码本身的大小之间的区别。
您可以通过在 ParDo
:
p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
.apply(ParDo.of(new ExpandFn()))
.apply(...fusion break (see below)...)
.apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")))
其中 ExpandFn
如下所示:
private static class ExpandFn extends DoFn<String, String> {
@ProcessElement
public void process(ProcessContext c) {
GcsUtil util = getUtil(c.getPipelineOptions());
for (String path : util.expand(GcsPath.fromUri(c.element()))) {
c.output(path);
}
}
}
和融合断裂我指的是this (basically, ParDo(add unique key)
+ group by key
+ Flatten.iterables()
+ Values.create()
). It's not very convenient and there are discussions happening about adding a built-in transform to do this (see this PR and this thread)。
非常感谢!使用您的输入,我这样解决了它:
public class ZipPipeline {
private static final Logger LOG = LoggerFactory.getLogger(ZipPipeline.class);
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
try{
p.apply(Create.of("gs://tlogdataflow/stage/*.zip"))
.apply(ParDo.of(new ExpandFN()))
.apply(ParDo.of(new AddKeyFN()))
.apply(GroupByKey.<String,String>create())
.apply(ParDo.of(new FlattenFN()))
.apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox")));
p.run();
}
catch(Exception e){
LOG.error(e.getMessage());
}
}
private static class FlattenFN extends DoFn<KV<String,Iterable<String>>, String>{
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c){
KV<String,Iterable<String>> kv = c.element();
for(String s: kv.getValue()){
c.output(s);
}
}
}
private static class ExpandFN extends DoFn<String,String>{
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception{
GcsUtil u = getUtil(c.getPipelineOptions());
for(GcsPath path : u.expand(GcsPath.fromUri(c.element()))){
c.output(path.toUri().toString());
}
}
}
private static class AddKeyFN extends DoFn<String, KV<String,String>>{
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c){
String path = c.element();
String monthKey = path.split("_")[4].substring(0, 6);
c.output(KV.of(monthKey, path));
}
}