Google 数据流上的 Apache Beam IllegalArgumentException 消息“不期望可拆分的 ParDoSingle:应该已被覆盖”
Apache Beam IllegalArgumentException on Google Dataflow with message `Not expecting a splittable ParDoSingle: should have been overridden`
我正在尝试编写一个管道,该管道定期检查 Google 存储桶中是否有新的 .gz
文件,这些文件实际上是压缩的 .csv
文件。然后它将这些记录写入 BigQuery table。在我添加 .watchForNewFiles(...)
和 .withMethod(STREAMING_INSERTS)
部分之前,以下代码以批处理模式运行。我希望它在流式传输模式下 运行 进行这些更改。但是我遇到一个例外,我在网上找不到任何相关内容。这是我的代码:
public static void main(String[] args) {
DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
//.withValidation()
.as(DataflowDfpOptions.class);
Pipeline pipeline = Pipeline.create(options);
Stopwatch sw = Stopwatch.createStarted();
log.info("DFP data transfer from GS to BQ has started.");
pipeline.apply("ReadFromStorage", TextIO.read()
.from("gs://my-bucket/my-folder/*.gz")
.withCompression(Compression.GZIP)
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.never()
)
)
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to(options.getTableId())
.withMethod(STREAMING_INSERTS)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead
pipeline.run().waitUntilFinish();
log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}
/**
* Creates a TableRow from a CSV line
*/
private static class TableRowConverterFn extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] split = c.element().split(",");
//Ignore the header line
//Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
if (split[0].equals("Time")) {
log.info("Skipped header");
return;
}
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = TableSchema.getFields().get(i);
//String is the most common type, putting it in the first if clause for a little bit optimization.
if (col.getType().equals("STRING")) {
row.set(col.getName(), split[i]);
} else if (col.getType().equals("INTEGER")) {
row.set(col.getName(), Long.valueOf(split[i]));
} else if (col.getType().equals("BOOLEAN")) {
row.set(col.getName(), Boolean.valueOf(split[i]));
} else if (col.getType().equals("FLOAT")) {
row.set(col.getName(), Float.valueOf(split[i]));
} else {
//Simply try to write it as a String if
//todo: Consider other BQ data types.
row.set(col.getName(), split[i]);
}
}
c.output(row);
}
}
和堆栈跟踪:
java.lang.IllegalArgumentException: Not expecting a splittable ParDoSingle: should have been overridden
at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:167)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:145)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:86)
at org.apache.beam.runners.core.construction.PipelineTranslation.visitPrimitiveTransform(PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:165)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:684)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.diply.data.App.main(App.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
这是我在 Dataflow 上发布作业的命令:
clean compile exec:java -Dexec.mainClass=com.my.project.App "-Dexec.args=--runner=DataflowRunner --tempLocation=gs://my-bucket/tmp --tableId=Temp.TestTable --project=my-project --jobName=dataflow-dfp-streaming" -Pdataflow-runner
我使用的是 apache beam 版本 2.5.0
。这是我 pom.xml
.
的相关部分
<properties>
<beam.version>2.5.0</beam.version>
<bigquery.version>v2-rev374-1.23.0</bigquery.version>
<google-clients.version>1.23.0</google-clients.version>
...
</properties>
运行 Dataflow 2.4.0 的代码给出了更明确的错误:java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn
但是,this answer suggests that this is supported since 2.2.0. This is indeed the case, and following this remark 您需要在 Dexec.args
中添加 --streaming
选项以强制其进入流模式。
我用 the code I supplied in the comments with both your pom and mine 测试了它 1. 在没有 --streaming
的情况下产生你的错误 2. 运行 用 --streaming
没问题
您可能想打开一个 github 光束问题,因为据我所知,这种行为在任何地方都没有正式记录。
我正在尝试编写一个管道,该管道定期检查 Google 存储桶中是否有新的 .gz
文件,这些文件实际上是压缩的 .csv
文件。然后它将这些记录写入 BigQuery table。在我添加 .watchForNewFiles(...)
和 .withMethod(STREAMING_INSERTS)
部分之前,以下代码以批处理模式运行。我希望它在流式传输模式下 运行 进行这些更改。但是我遇到一个例外,我在网上找不到任何相关内容。这是我的代码:
public static void main(String[] args) {
DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
//.withValidation()
.as(DataflowDfpOptions.class);
Pipeline pipeline = Pipeline.create(options);
Stopwatch sw = Stopwatch.createStarted();
log.info("DFP data transfer from GS to BQ has started.");
pipeline.apply("ReadFromStorage", TextIO.read()
.from("gs://my-bucket/my-folder/*.gz")
.withCompression(Compression.GZIP)
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.never()
)
)
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to(options.getTableId())
.withMethod(STREAMING_INSERTS)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead
pipeline.run().waitUntilFinish();
log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}
/**
* Creates a TableRow from a CSV line
*/
private static class TableRowConverterFn extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] split = c.element().split(",");
//Ignore the header line
//Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
if (split[0].equals("Time")) {
log.info("Skipped header");
return;
}
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = TableSchema.getFields().get(i);
//String is the most common type, putting it in the first if clause for a little bit optimization.
if (col.getType().equals("STRING")) {
row.set(col.getName(), split[i]);
} else if (col.getType().equals("INTEGER")) {
row.set(col.getName(), Long.valueOf(split[i]));
} else if (col.getType().equals("BOOLEAN")) {
row.set(col.getName(), Boolean.valueOf(split[i]));
} else if (col.getType().equals("FLOAT")) {
row.set(col.getName(), Float.valueOf(split[i]));
} else {
//Simply try to write it as a String if
//todo: Consider other BQ data types.
row.set(col.getName(), split[i]);
}
}
c.output(row);
}
}
和堆栈跟踪:
java.lang.IllegalArgumentException: Not expecting a splittable ParDoSingle: should have been overridden
at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:167)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:145)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:86)
at org.apache.beam.runners.core.construction.PipelineTranslation.visitPrimitiveTransform(PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:165)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:684)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.diply.data.App.main(App.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
这是我在 Dataflow 上发布作业的命令:
clean compile exec:java -Dexec.mainClass=com.my.project.App "-Dexec.args=--runner=DataflowRunner --tempLocation=gs://my-bucket/tmp --tableId=Temp.TestTable --project=my-project --jobName=dataflow-dfp-streaming" -Pdataflow-runner
我使用的是 apache beam 版本 2.5.0
。这是我 pom.xml
.
<properties>
<beam.version>2.5.0</beam.version>
<bigquery.version>v2-rev374-1.23.0</bigquery.version>
<google-clients.version>1.23.0</google-clients.version>
...
</properties>
运行 Dataflow 2.4.0 的代码给出了更明确的错误:java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn
但是,this answer suggests that this is supported since 2.2.0. This is indeed the case, and following this remark 您需要在 Dexec.args
中添加 --streaming
选项以强制其进入流模式。
我用 the code I supplied in the comments with both your pom and mine 测试了它 1. 在没有 --streaming
的情况下产生你的错误 2. 运行 用 --streaming
您可能想打开一个 github 光束问题,因为据我所知,这种行为在任何地方都没有正式记录。