使用 ParDo 的 processElement 使用 REST API

Consuming REST APIs using ParDo's processElement

虽然我正在破解一个快速的 CSV 到 Firebase 上传,但我只是这样做了,而不是编写自定义接收器。这是对代码的过度简化:

public static void main(String[] args) throws Exception {

    Options options = PipelineOptionsFactory.as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> CsvData = p.apply(TextIO.Read.from("/my_file.csv"));
    CsvData.apply(ParDo.named("Firebase").of(new DoFn<String, Void>() {
          @Override
          public void processElement(ProcessContext c) {
              Firebase fb = new Firebase("https://MYAPP.firebaseio.com/");
              fb.child("someId").setValue(c.element.getValue());
          }
        });

}

有效。这是应该在 Cloud Dataflow 上使用 REST API 的地方吗?

是的,这应该有效,假设您同意以下警告:如果失败,捆绑包可能会被复制或重试多次,即您的 processElement 调用可能会在同一时间被调用元素多次,可能同时发生。

即使 Dataflow 会对结果进行重复数据删除(即只有通过 c.output() 发出的一个成功调用的项目最终会出现在结果 PCollection 中),但会删除重复的副作用(例如制作外部 API 调用)是您的代码的责任。

自定义接收器 API 只是强调了这些问题并提供了一个 "pattern" 处理它们的方法(通过提供具有唯一 ID 的捆绑包并提供一个钩子来提交成功的结果 - 例如基于文件的sinks 会将每个包写入一个唯一命名的临时文件,提交挂钩会将成功完成的包写入的文件重命名为最终位置) - 但如果您的用例对它们不敏感,那么您可以很好地使用一个简单的 ParDo.

此外,请注意 Dataflow 还没有用于流式传输的自定义接收器 API,因此如果这是流式传输管道,那么 ParDo 绝对是正确的选择。

在您的 ParDo 中,您可能希望对 Firebase 调用进行批处理以避免每次调用的开销。您可以使用 DoFn.finishBundle() 来做到这一点(即在缓冲区中维护批量更新,在 processElement 中追加它,并在它变得太大时刷新它,最后一次在 finishBundle 中刷新)。请参阅 .

中类似模式的示例