GCP:在 apache beam 中用 Coder 替换旧数据流功能?
GCP: Replacement of Old Data flow funtion withCoder in apache beam?
我正在为 "storage" 阅读 json 并将其转储到 "bigQuery"。为此,我写了以下内容:-
PCollectionTuple collectionTuple = p
.apply(TextIO.Read.named("Read Input Files : " + dataFeedCode).from(file)
.withCoder(TableRowJsonCoder.of()))
.apply(ParDo.named("Perform Transformation")
.of(performTransformation(fieldMappings, inValidRecords, actualFileName))
.withOutputTags(validRecords, TupleTagList.of(inValidRecords)));
现在我必须将此代码转换为 Apache Beam 实现,因此我编写了以下代码:-
PCollectionTuple collectionTuple = p.apply(TextIO.read().from(file))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(ParDo.of(performTransformation(fieldMappings, inValidRecords, actualFileName))
.withOutputTags(validRecords, TupleTagList.of(inValidRecords)))
static class ParseTableRowJson extends SimpleFunction<String, TableRow> {
@Override
public TableRow apply(String input) {
try {
//return Transport.getJsonFactory().fromString(input, TableRow.class);
;
return TableRowJsonCoder.of().decode(new ByteArrayInputStream(CharStreams.toString(CharSource.wrap(input).openStream()).getBytes()));
} catch (IOException e) {
throw new RuntimeException("Failed parsing table row json", e);
}
}
}
因为他们无法在 Apache Beam 中 "withCoder" 发挥作用。我有点挣扎。我收到以下错误
datafeed.dataflow.pipeline.DataFeedWriteToBigQueryPipeline$ParseTableRowJson.apply(DataFeedWriteToBigQueryPipeline.java:302)
at com.morrisons.datafeed.dataflow.pipeline.DataFeedWriteToBigQueryPipeline$ParseTableRowJson.apply(DataFeedWriteToBigQueryPipeline.java:1)
at org.apache.beam.sdk.transforms.Contextful.lambda$fnbf234f(Contextful.java:112)
at org.apache.beam.sdk.transforms.MapElements.processElement(MapElements.java:129)
at org.apache.beam.sdk.transforms.MapElements$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:383)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:355)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:286)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by:0 java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:63)
at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:106)
... 25 more
在此先感谢您的支持。
因为您已经按照 Migrating from Cloud Dataflow SDK 1.x 文档的建议使用了 MapElements
和 ParDo
。然后,您可以尝试使 ParseTableRowJson
成为 DoFn
而不是 SimpleFunction,例如,通过将 MapElements
转换为 DoFn
。
我正在为 "storage" 阅读 json 并将其转储到 "bigQuery"。为此,我写了以下内容:-
PCollectionTuple collectionTuple = p
.apply(TextIO.Read.named("Read Input Files : " + dataFeedCode).from(file)
.withCoder(TableRowJsonCoder.of()))
.apply(ParDo.named("Perform Transformation")
.of(performTransformation(fieldMappings, inValidRecords, actualFileName))
.withOutputTags(validRecords, TupleTagList.of(inValidRecords)));
现在我必须将此代码转换为 Apache Beam 实现,因此我编写了以下代码:-
PCollectionTuple collectionTuple = p.apply(TextIO.read().from(file))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(ParDo.of(performTransformation(fieldMappings, inValidRecords, actualFileName))
.withOutputTags(validRecords, TupleTagList.of(inValidRecords)))
static class ParseTableRowJson extends SimpleFunction<String, TableRow> {
@Override
public TableRow apply(String input) {
try {
//return Transport.getJsonFactory().fromString(input, TableRow.class);
;
return TableRowJsonCoder.of().decode(new ByteArrayInputStream(CharStreams.toString(CharSource.wrap(input).openStream()).getBytes()));
} catch (IOException e) {
throw new RuntimeException("Failed parsing table row json", e);
}
}
}
因为他们无法在 Apache Beam 中 "withCoder" 发挥作用。我有点挣扎。我收到以下错误
datafeed.dataflow.pipeline.DataFeedWriteToBigQueryPipeline$ParseTableRowJson.apply(DataFeedWriteToBigQueryPipeline.java:302)
at com.morrisons.datafeed.dataflow.pipeline.DataFeedWriteToBigQueryPipeline$ParseTableRowJson.apply(DataFeedWriteToBigQueryPipeline.java:1)
at org.apache.beam.sdk.transforms.Contextful.lambda$fnbf234f(Contextful.java:112)
at org.apache.beam.sdk.transforms.MapElements.processElement(MapElements.java:129)
at org.apache.beam.sdk.transforms.MapElements$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:383)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:355)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:286)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by:0 java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:63)
at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:106)
... 25 more
在此先感谢您的支持。
因为您已经按照 Migrating from Cloud Dataflow SDK 1.x 文档的建议使用了 MapElements
和 ParDo
。然后,您可以尝试使 ParseTableRowJson
成为 DoFn
而不是 SimpleFunction,例如,通过将 MapElements
转换为 DoFn
。