无法 运行 具有 OutputTags 的作业
Unable to run a job with OutputTags
我一直在为需要发出副输出的工作而苦苦挣扎,因为我不断收到异常 ('unable to serialize xxx')。
即使我明确指定了我正在使用的类型的编码器,我仍然遇到同样的错误,所以我决定按照这个文档编写一个简单的工作:
https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs
令我惊讶的是,我仍然得到同样的异常,现在我怀疑我一定是做错了什么(但我自己也想不通)。就代码而言,我尝试按照上面给出的示例进行操作。
下面,我正在 posting 源代码以及 运行 时收到的错误消息。我相信这是可重现的(将 'GCS_BUCKET' 更改为您拥有的任何存储桶,并创建使用 args 调用 'TestSideOutput' 的 main() 方法),最好知道其他人是否可以重现他们的结尾。
我们正在使用 JDK 8 和 Dataflow SDK 1.7.0.
请注意,上面文档中的示例使用了匿名 class 扩展 DoFn,我也尝试过但得到了相同类型的错误消息;下面的代码将此 class 重构为命名的 inner-class ('Filter')。
我还尝试在不使用花括号(“{}”)的情况下初始化 TupleTags——因为这实际上会产生警告——从而导致异常(请参阅此 post 中的最后一个代码片段)。
这是我使用的代码:
package tmp.dataflow.experimental;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.moloco.dataflow.DataflowConstants;
public class TestSideOutput {
private TestOptions options;
private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name
public TestSideOutput(String[] args) {
options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class);
options.setProject(DataflowConstants.PROJCET_NAME);
options.setStagingLocation(DataflowConstants.STAGING_BUCKET);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setJobName(options.getJob() + "-test-sideoutput");
}
public void execute() {
Pipeline pipeline = Pipeline.create(options);
// 1. Read sample data.
PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading")
.from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of()));
// 2. Create tags for outputs.
final TupleTag<String> mainTag = new TupleTag<String>() {};
final TupleTag<String> sideTag = new TupleTag<String>() {};
// 3. Apply ParDo with side output tags.
Filter filter = new Filter("DATAFLOW", sideTag);
PCollectionTuple results =
profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter));
// 4. Retrieve outputs.
PCollection<String> mainOutput = results.get(mainTag);
PCollection<String> sideOutput = results.get(sideTag);
// 5. Write to GCS.
mainOutput.apply(
TextIO.Write.named("writingMain").to(GCS_BUCKET + "example/main-output/main").withCoder(StringUtf8Coder.of()));
sideOutput.apply(
TextIO.Write.named("writingSide").to(GCS_BUCKET + "example/side-output/side").withCoder(StringUtf8Coder.of()));
// 6. Run pipeline.
pipeline.run();
}
static class Filter extends DoFn<String, String> {
private static final long serialVersionUID = 0;
final TupleTag<String> sideTag;
String keyword;
public Filter(String keyword, TupleTag<String> sideTag) {
this.sideTag = sideTag;
this.keyword = keyword;
}
@Override
public void processElement(ProcessContext c) throws Exception {
String profile = c.element();
if (profile.contains(keyword)) {
c.output(profile);
} else {
c.sideOutput(sideTag, profile);
}
}
}
}
这是我使用的命令,也是我得到的 error/exception(它只包含一些我们用于数据流包的命令行参数,这里没有什么特别的,只是为了给你一个想法):
dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize tmp.dataflow.experimental.TestSideOutput$Filter@6986852
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959)
at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912)
at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908)
at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41)
at com.moloco.dataflow.Main.main(Main.java:152)
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
... 6 more
此外,我认为这无关紧要,但是 'TestOptions' class 的代码:
package tmp.dataflow.experimental;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.Validation;
public interface TestOptions extends DataflowPipelineOptions {
@Description("Job")
@Validation.Required
String getJob();
void setJob(String value);
@Description("Job suffix")
String getJobSuffix();
void setJobSuffix(String value);
@Description("Date")
@Validation.Required
String getDate();
void setDate(String value);
}
最后,如果我在实例化 TupleTags 时删除大括号“{}”,我会得到以下异常(并且我在 Whosebug 上发现我应该用“{}”实例化它们以避免这种情况的建议问题类型):
Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.
Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure.
at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195)
at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50)
at com.moloco.dataflow.Main.main(Main.java:152)
编辑:请参阅下面的答案,通过执行 execute() 'static'.
来解决此问题
下面的代码类似于我最初 post 编写的代码,但有两处更改:
只要有可能,我都会为每个 PCollection 再次明确(和冗余)指定 'coder'。此外,在实例化 TupleTags 时,没有花括号。
注意确定哪种方法(静态方法与这种冗余方法)更合适。
public void execute() {
Pipeline pipeline = Pipeline.create(options);
// 1. Read sample data.
PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading")
.from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of()));
// 2. Create tags for outputs.
final TupleTag<String> mainTag = new TupleTag<String>();
final TupleTag<String> sideTag = new TupleTag<String>();
// 3. Apply ParDo with side output tags.
Filter filter = new Filter("DATAFLOW", sideTag);
PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of())
.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter));
// 4. Retrieve outputs.
PCollection<String> mainOutput = results.get(mainTag);
PCollection<String> sideOutput = results.get(sideTag);
// 5. Write to GCS.
mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain")
.to(GCS_BUCKET + "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of()));
sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide")
.to(GCS_BUCKET + "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of()));
// 6. Run pipeline.
pipeline.run();
}
您遇到的错误是因为您的 Filter
fn 引用了 TupleTag
,而后者又(因为它是从非静态实例化的非静态匿名 class函数 execute()
) 引用封闭的 TestSideOutput
.
所以管道正在尝试序列化 TestSideOutput
对象,但它是不可序列化的 - 如消息所示:java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput
.
根本原因是方法execute()
不是静态的。将其设置为静态应该可以解决问题。
我一直在为需要发出副输出的工作而苦苦挣扎,因为我不断收到异常 ('unable to serialize xxx')。
即使我明确指定了我正在使用的类型的编码器,我仍然遇到同样的错误,所以我决定按照这个文档编写一个简单的工作:
https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs
令我惊讶的是,我仍然得到同样的异常,现在我怀疑我一定是做错了什么(但我自己也想不通)。就代码而言,我尝试按照上面给出的示例进行操作。
下面,我正在 posting 源代码以及 运行 时收到的错误消息。我相信这是可重现的(将 'GCS_BUCKET' 更改为您拥有的任何存储桶,并创建使用 args 调用 'TestSideOutput' 的 main() 方法),最好知道其他人是否可以重现他们的结尾。 我们正在使用 JDK 8 和 Dataflow SDK 1.7.0.
请注意,上面文档中的示例使用了匿名 class 扩展 DoFn,我也尝试过但得到了相同类型的错误消息;下面的代码将此 class 重构为命名的 inner-class ('Filter')。
我还尝试在不使用花括号(“{}”)的情况下初始化 TupleTags——因为这实际上会产生警告——从而导致异常(请参阅此 post 中的最后一个代码片段)。
这是我使用的代码:
package tmp.dataflow.experimental;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.moloco.dataflow.DataflowConstants;
public class TestSideOutput {
private TestOptions options;
private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name
public TestSideOutput(String[] args) {
options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class);
options.setProject(DataflowConstants.PROJCET_NAME);
options.setStagingLocation(DataflowConstants.STAGING_BUCKET);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setJobName(options.getJob() + "-test-sideoutput");
}
public void execute() {
Pipeline pipeline = Pipeline.create(options);
// 1. Read sample data.
PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading")
.from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of()));
// 2. Create tags for outputs.
final TupleTag<String> mainTag = new TupleTag<String>() {};
final TupleTag<String> sideTag = new TupleTag<String>() {};
// 3. Apply ParDo with side output tags.
Filter filter = new Filter("DATAFLOW", sideTag);
PCollectionTuple results =
profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter));
// 4. Retrieve outputs.
PCollection<String> mainOutput = results.get(mainTag);
PCollection<String> sideOutput = results.get(sideTag);
// 5. Write to GCS.
mainOutput.apply(
TextIO.Write.named("writingMain").to(GCS_BUCKET + "example/main-output/main").withCoder(StringUtf8Coder.of()));
sideOutput.apply(
TextIO.Write.named("writingSide").to(GCS_BUCKET + "example/side-output/side").withCoder(StringUtf8Coder.of()));
// 6. Run pipeline.
pipeline.run();
}
static class Filter extends DoFn<String, String> {
private static final long serialVersionUID = 0;
final TupleTag<String> sideTag;
String keyword;
public Filter(String keyword, TupleTag<String> sideTag) {
this.sideTag = sideTag;
this.keyword = keyword;
}
@Override
public void processElement(ProcessContext c) throws Exception {
String profile = c.element();
if (profile.contains(keyword)) {
c.output(profile);
} else {
c.sideOutput(sideTag, profile);
}
}
}
}
这是我使用的命令,也是我得到的 error/exception(它只包含一些我们用于数据流包的命令行参数,这里没有什么特别的,只是为了给你一个想法):
dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize tmp.dataflow.experimental.TestSideOutput$Filter@6986852
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959)
at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912)
at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908)
at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41)
at com.moloco.dataflow.Main.main(Main.java:152)
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
... 6 more
此外,我认为这无关紧要,但是 'TestOptions' class 的代码:
package tmp.dataflow.experimental;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.Validation;
public interface TestOptions extends DataflowPipelineOptions {
@Description("Job")
@Validation.Required
String getJob();
void setJob(String value);
@Description("Job suffix")
String getJobSuffix();
void setJobSuffix(String value);
@Description("Date")
@Validation.Required
String getDate();
void setDate(String value);
}
最后,如果我在实例化 TupleTags 时删除大括号“{}”,我会得到以下异常(并且我在 Whosebug 上发现我应该用“{}”实例化它们以避免这种情况的建议问题类型):
Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.
Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure.
at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195)
at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50)
at com.moloco.dataflow.Main.main(Main.java:152)
编辑:请参阅下面的答案,通过执行 execute() 'static'.
来解决此问题下面的代码类似于我最初 post 编写的代码,但有两处更改: 只要有可能,我都会为每个 PCollection 再次明确(和冗余)指定 'coder'。此外,在实例化 TupleTags 时,没有花括号。
注意确定哪种方法(静态方法与这种冗余方法)更合适。
public void execute() {
Pipeline pipeline = Pipeline.create(options);
// 1. Read sample data.
PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading")
.from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of()));
// 2. Create tags for outputs.
final TupleTag<String> mainTag = new TupleTag<String>();
final TupleTag<String> sideTag = new TupleTag<String>();
// 3. Apply ParDo with side output tags.
Filter filter = new Filter("DATAFLOW", sideTag);
PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of())
.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter));
// 4. Retrieve outputs.
PCollection<String> mainOutput = results.get(mainTag);
PCollection<String> sideOutput = results.get(sideTag);
// 5. Write to GCS.
mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain")
.to(GCS_BUCKET + "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of()));
sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide")
.to(GCS_BUCKET + "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of()));
// 6. Run pipeline.
pipeline.run();
}
您遇到的错误是因为您的 Filter
fn 引用了 TupleTag
,而后者又(因为它是从非静态实例化的非静态匿名 class函数 execute()
) 引用封闭的 TestSideOutput
.
所以管道正在尝试序列化 TestSideOutput
对象,但它是不可序列化的 - 如消息所示:java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput
.
根本原因是方法execute()
不是静态的。将其设置为静态应该可以解决问题。