Apache Beam - 跳过流水线步骤
Apache Beam - skip pipeline step
我正在使用 Apache Beam 设置一个包含 2 个主要步骤的管道:
- 使用 Beam Transform 转换数据
- 将转换后的数据加载到 BigQuery
管道设置如下所示:
myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
.apply("do a parallel transform"),
ParDo.of(new MyTransformClassName.MyTransformFn()));
myPCollection
.apply("Load BigQuery data for PCollection",
BigQueryIO.<myCollectionObjectType>write()
.to(new MyDataLoadClass.MyFactTableDestination(myDestination))
.withFormatFunction(new MyDataLoadClass.MySerializationFn())
我看过这个问题:
这表明在步骤 1 中的并行转换之后,我可能能够以某种方式动态更改我可以将数据传递到的输出。
我该怎么做?我不知道如何选择是否将myPCollection
从第1步传递到第2步。如果第1步myPCollection
中的对象是null
,我需要跳过第2步。
当你在下一步中不需要它时,你只是不从你的 MyTransformClassName.MyTransformFn
发出元素,例如这样的东西:
class MyTransformClassName.MyTransformFn extends...
@ProcessElement
public void processElement(ProcessContext c, ...) {
...
result = ...
if (result != null) {
c.output(result); //only output something that's not null
}
}
这样 null 就不会到达下一步。
有关详细信息,请参阅指南的 ParDo
部分:https://beam.apache.org/documentation/programming-guide/#pardo
我正在使用 Apache Beam 设置一个包含 2 个主要步骤的管道:
- 使用 Beam Transform 转换数据
- 将转换后的数据加载到 BigQuery
管道设置如下所示:
myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
.apply("do a parallel transform"),
ParDo.of(new MyTransformClassName.MyTransformFn()));
myPCollection
.apply("Load BigQuery data for PCollection",
BigQueryIO.<myCollectionObjectType>write()
.to(new MyDataLoadClass.MyFactTableDestination(myDestination))
.withFormatFunction(new MyDataLoadClass.MySerializationFn())
我看过这个问题:
这表明在步骤 1 中的并行转换之后,我可能能够以某种方式动态更改我可以将数据传递到的输出。
我该怎么做?我不知道如何选择是否将myPCollection
从第1步传递到第2步。如果第1步myPCollection
中的对象是null
,我需要跳过第2步。
当你在下一步中不需要它时,你只是不从你的 MyTransformClassName.MyTransformFn
发出元素,例如这样的东西:
class MyTransformClassName.MyTransformFn extends...
@ProcessElement
public void processElement(ProcessContext c, ...) {
...
result = ...
if (result != null) {
c.output(result); //only output something that's not null
}
}
这样 null 就不会到达下一步。
有关详细信息,请参阅指南的 ParDo
部分:https://beam.apache.org/documentation/programming-guide/#pardo