从 Dataflow 2.5.0 SDK 迁移到 Beam 2.13 版本

Migrating from Dataflow 2.5.0 SDK to Beam 2.13 release

我收到一条错误消息,指出 Dataflow 2.5 (Java) 是最后一个受支持的版本,我应该使用 Beam。有迁移指南吗?我可以找到从 1.x 到 2.x 的数据流,但找不到到 Beam 的数据流。

例如,如果您使用 Beam 文档中建议的 maven 原型,则似乎不会安装 DataflowPipelineOptions。

具体来说: import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions 当我使用使用以下方法生成的 pom.xml 时找不到:

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-starter \
      -DarchetypeVersion=2.13.0 \
      -DgroupId=com.myexample \
      -DartifactId=newpackage \
      -Dversion="1.1" \
      -DinteractiveMode=false

即使在添加之后:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>2.13.0</version>
  <scope>runtime</scope>
</dependency>

到生成的pom.xml.

这个blog post可能会有帮助。这里的一位用户描述了他们的迁移。

我相信包重命名(com.google.cloud.dataflow 到 org.apache.beam)和新的 class/method 签名已经完成,如果你在 Dataflow 2.x SDKs

所以我认为在这种情况下迁移应该很简单。请尝试删除 Dataflow SDK 并引入 org.apache.beam on the lastest version。它可能无需修改即可工作。您也可以先尝试在 2.5 上使用 org.apache.beam。然后升级到2.13,看看是否也顺利。

您的 pom.xml 中需要一些额外的 Google 云依赖项,以便 运行 您的 Beam 管道在 Dataflow 上。添加后对我有用:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>${beam.version}</version>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
  <exclusions>
    <exclusion>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
  <version>${beam.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-protobuf</artifactId>
  <version>${beam.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-jdbc</artifactId>
  <version>${beam.version}</version>
</dependency>

此外,您可能需要向启动脚本添加更多参数。我必须添加:

gcpTempLocation=gs://$BUCKET/tmp