外部 api 在 Apache Beam 数据流中调用

external api call in apache beam dataflow

我有一个用例,我读取存储在 google 云存储中的换行符 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用一个外部 API 来进行重复数据删除,无论该 json 元素之前是否被发现。我正在做一个 ParDo,每个 json 上有一个 DoFn

我还没有看到任何在线教程说明如何从 apache beam DoFn Dataflow 调用外部 API 端点。

我正在使用 Beam 的 JAVA SDK。我学习的一些教程解释说使用 startBundleFinishBundle 但我不清楚如何使用它

下面的博客 post 中有一个使用有状态 DoFn 批量调用外部系统的示例:https://beam.apache.org/blog/2017/08/28/timely-processing.html,可能会有帮助。

如果您需要为每个 JSON 记录检查外部存储中的重复项,那么您仍然可以使用 DoFn。有几个注释,如 @Setup@StartBundle@FinishBundle 等,可用于注释 DoFn 中的方法。

例如,如果您需要实例化一个客户端对象来向您的外部数据库发送请求,那么您可能希望在 @Setup 方法(如 POJO 构造函数)中执行此操作,然后在你的 @ProcessElement 方法。

让我们考虑一个简单的例子:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

此外,为了避免对每条记录进行远程调用,您可以将记录批处理打包到内部缓冲区(Beam 将输入数据拆分成包)并以批处理模式检查重复项(如果您的客户端支持此功能)。为此,您可以使用 @StartBundle@FinishBundle 注释方法,这些方法将在相应地处理 Beam bundle 之前和之后调用。

对于更复杂的示例,我建议查看不同 Beam IOs 中的 Sink 实现,例如 KinesisIO