Google Cloud Dataflow:提交的作业正在执行但使用的是旧代码
Google Cloud Dataflow: Submitted job is executing but using old code
我正在编写一个应该做 3 件事的数据流管道:
- 正在从 GCP 存储读取 .csv 文件
- 正在将数据解析为 BigQuery 兼容的 TableRows
- 正在将数据写入 BigQuery table
到目前为止,这一切都非常有效。它仍然如此,但是当我更改源变量和目标变量时,没有任何变化。实际上 运行s 的工作是旧的,而不是最近更改(和提交)的代码。不知何故,当我 运行 使用 BlockingDataflowPipelineRunner 来自 Eclipse 的代码时,代码本身并未上传,但使用了旧版本。
代码通常没有问题,但要尽可能完整:
public class BatchPipeline {
String source = "gs://sourcebucket/*.csv";
String destination = "projectID:datasetID.testing1";
//Creation of the pipeline with default arguments
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
.from(source));
@SuppressWarnings("serial")
PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
@Override
public void processElement(ProcessContext c){
//processing code goes here
}
}));
//Defining the BigQuery table scheme
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
TableSchema schema = new TableSchema().setFields(fields);
String table = destination;
tablerows.apply(BigQueryIO.Write
.named("BigQueryWrite")
.to(table)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withoutValidation());
//Runs the pipeline
p.run();
}
出现这个问题是因为我刚刚更换了笔记本电脑并且不得不重新配置所有内容。我正在开发一个干净的 Ubuntu 16.04 LTS OS,其中安装了 GCP 开发的所有依赖项(通常)。通常一切都配置得很好,因为我能够开始工作(如果我的配置有误,这应该是不可能的,对吧?)。我正在使用 Eclipse Neon 顺便说一句。
那么问题出在哪里呢?在我看来,上传代码时出现问题,但我已确保我的云 git 存储库是最新的并且暂存存储桶已被清理...
**** 更新 ****
我从来没有发现到底出了什么问题,但是当我检查部署的 jar 中文件的创建日期时,我确实看到它们从未真正更新过。然而,jar 文件本身有一个最近的时间戳,这让我完全忽略了这个问题(菜鸟错误)。
我最终通过简单地在 Eclipse 中创建一个新的 Dataflow 项目并将我的 .java 文件从损坏的项目复制到新项目中,使它重新工作。从那时起一切都像魅力一样。
提交 Dataflow 作业后,您可以通过检查作为作业描述一部分的文件来检查哪些工件是作业规范的一部分,这些文件可通过 DataflowPipelineWorkerPoolOptions#getFilesToStage 获得。下面的代码片段提供了有关如何获取此信息的一些示例。
PipelineOptions myOptions = ...
myOptions.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(myOptions);
// Build up your pipeline and run it.
p.apply(...)
p.run();
// At this point in time, the files which were staged by the
// DataflowPipelineRunner will have been populated into the
// DataflowPipelineWorkerPoolOptions#getFilesToStage
List<String> stagedFiles = myOptions.as(DataflowPipelineWorkerPoolOptions.class).getFilesToStage();
for (String stagedFile : stagedFiles) {
System.out.println(stagedFile);
}
上面的代码应该打印出如下内容:
/my/path/to/file/dataflow.jar
/another/path/to/file/myapplication.jar
/a/path/to/file/alibrary.jar
您上传的作业的资源部分可能在某种程度上已经过时,其中包含您的旧代码。查看暂存列表的所有目录和 jar 部分,找到 BatchPipeline
的所有实例并验证它们的年龄。 jar
文件可以使用 jar tool or any zip
file reader. Alternatively use javap or any other class file inspector 来提取,以验证 BatchPipeline
class 文件是否符合您所做的预期更改。
我正在编写一个应该做 3 件事的数据流管道:
- 正在从 GCP 存储读取 .csv 文件
- 正在将数据解析为 BigQuery 兼容的 TableRows
- 正在将数据写入 BigQuery table
到目前为止,这一切都非常有效。它仍然如此,但是当我更改源变量和目标变量时,没有任何变化。实际上 运行s 的工作是旧的,而不是最近更改(和提交)的代码。不知何故,当我 运行 使用 BlockingDataflowPipelineRunner 来自 Eclipse 的代码时,代码本身并未上传,但使用了旧版本。
代码通常没有问题,但要尽可能完整:
public class BatchPipeline {
String source = "gs://sourcebucket/*.csv";
String destination = "projectID:datasetID.testing1";
//Creation of the pipeline with default arguments
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
.from(source));
@SuppressWarnings("serial")
PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
@Override
public void processElement(ProcessContext c){
//processing code goes here
}
}));
//Defining the BigQuery table scheme
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
TableSchema schema = new TableSchema().setFields(fields);
String table = destination;
tablerows.apply(BigQueryIO.Write
.named("BigQueryWrite")
.to(table)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withoutValidation());
//Runs the pipeline
p.run();
}
出现这个问题是因为我刚刚更换了笔记本电脑并且不得不重新配置所有内容。我正在开发一个干净的 Ubuntu 16.04 LTS OS,其中安装了 GCP 开发的所有依赖项(通常)。通常一切都配置得很好,因为我能够开始工作(如果我的配置有误,这应该是不可能的,对吧?)。我正在使用 Eclipse Neon 顺便说一句。
那么问题出在哪里呢?在我看来,上传代码时出现问题,但我已确保我的云 git 存储库是最新的并且暂存存储桶已被清理...
**** 更新 ****
我从来没有发现到底出了什么问题,但是当我检查部署的 jar 中文件的创建日期时,我确实看到它们从未真正更新过。然而,jar 文件本身有一个最近的时间戳,这让我完全忽略了这个问题(菜鸟错误)。
我最终通过简单地在 Eclipse 中创建一个新的 Dataflow 项目并将我的 .java 文件从损坏的项目复制到新项目中,使它重新工作。从那时起一切都像魅力一样。
提交 Dataflow 作业后,您可以通过检查作为作业描述一部分的文件来检查哪些工件是作业规范的一部分,这些文件可通过 DataflowPipelineWorkerPoolOptions#getFilesToStage 获得。下面的代码片段提供了有关如何获取此信息的一些示例。
PipelineOptions myOptions = ...
myOptions.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(myOptions);
// Build up your pipeline and run it.
p.apply(...)
p.run();
// At this point in time, the files which were staged by the
// DataflowPipelineRunner will have been populated into the
// DataflowPipelineWorkerPoolOptions#getFilesToStage
List<String> stagedFiles = myOptions.as(DataflowPipelineWorkerPoolOptions.class).getFilesToStage();
for (String stagedFile : stagedFiles) {
System.out.println(stagedFile);
}
上面的代码应该打印出如下内容:
/my/path/to/file/dataflow.jar
/another/path/to/file/myapplication.jar
/a/path/to/file/alibrary.jar
您上传的作业的资源部分可能在某种程度上已经过时,其中包含您的旧代码。查看暂存列表的所有目录和 jar 部分,找到 BatchPipeline
的所有实例并验证它们的年龄。 jar
文件可以使用 jar tool or any zip
file reader. Alternatively use javap or any other class file inspector 来提取,以验证 BatchPipeline
class 文件是否符合您所做的预期更改。