使用 ParDo 的 processElement 使用 REST API
Consuming REST APIs using ParDo's processElement
虽然我正在破解一个快速的 CSV 到 Firebase 上传,但我只是这样做了,而不是编写自定义接收器。这是对代码的过度简化:
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> CsvData = p.apply(TextIO.Read.from("/my_file.csv"));
CsvData.apply(ParDo.named("Firebase").of(new DoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
Firebase fb = new Firebase("https://MYAPP.firebaseio.com/");
fb.child("someId").setValue(c.element.getValue());
}
});
}
有效。这是应该在 Cloud Dataflow 上使用 REST API 的地方吗?
是的,这应该有效,假设您同意以下警告:如果失败,捆绑包可能会被复制或重试多次,即您的 processElement
调用可能会在同一时间被调用元素多次,可能同时发生。
即使 Dataflow 会对结果进行重复数据删除(即只有通过 c.output()
发出的一个成功调用的项目最终会出现在结果 PCollection
中),但会删除重复的副作用(例如制作外部 API 调用)是您的代码的责任。
自定义接收器 API 只是强调了这些问题并提供了一个 "pattern" 处理它们的方法(通过提供具有唯一 ID 的捆绑包并提供一个钩子来提交成功的结果 - 例如基于文件的sinks 会将每个包写入一个唯一命名的临时文件,提交挂钩会将成功完成的包写入的文件重命名为最终位置) - 但如果您的用例对它们不敏感,那么您可以很好地使用一个简单的 ParDo
.
此外,请注意 Dataflow 还没有用于流式传输的自定义接收器 API,因此如果这是流式传输管道,那么 ParDo
绝对是正确的选择。
在您的 ParDo 中,您可能希望对 Firebase 调用进行批处理以避免每次调用的开销。您可以使用 DoFn.finishBundle()
来做到这一点(即在缓冲区中维护批量更新,在 processElement
中追加它,并在它变得太大时刷新它,最后一次在 finishBundle
中刷新)。请参阅 .
中类似模式的示例
虽然我正在破解一个快速的 CSV 到 Firebase 上传,但我只是这样做了,而不是编写自定义接收器。这是对代码的过度简化:
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> CsvData = p.apply(TextIO.Read.from("/my_file.csv"));
CsvData.apply(ParDo.named("Firebase").of(new DoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
Firebase fb = new Firebase("https://MYAPP.firebaseio.com/");
fb.child("someId").setValue(c.element.getValue());
}
});
}
有效。这是应该在 Cloud Dataflow 上使用 REST API 的地方吗?
是的,这应该有效,假设您同意以下警告:如果失败,捆绑包可能会被复制或重试多次,即您的 processElement
调用可能会在同一时间被调用元素多次,可能同时发生。
即使 Dataflow 会对结果进行重复数据删除(即只有通过 c.output()
发出的一个成功调用的项目最终会出现在结果 PCollection
中),但会删除重复的副作用(例如制作外部 API 调用)是您的代码的责任。
自定义接收器 API 只是强调了这些问题并提供了一个 "pattern" 处理它们的方法(通过提供具有唯一 ID 的捆绑包并提供一个钩子来提交成功的结果 - 例如基于文件的sinks 会将每个包写入一个唯一命名的临时文件,提交挂钩会将成功完成的包写入的文件重命名为最终位置) - 但如果您的用例对它们不敏感,那么您可以很好地使用一个简单的 ParDo
.
此外,请注意 Dataflow 还没有用于流式传输的自定义接收器 API,因此如果这是流式传输管道,那么 ParDo
绝对是正确的选择。
在您的 ParDo 中,您可能希望对 Firebase 调用进行批处理以避免每次调用的开销。您可以使用 DoFn.finishBundle()
来做到这一点(即在缓冲区中维护批量更新,在 processElement
中追加它,并在它变得太大时刷新它,最后一次在 finishBundle
中刷新)。请参阅