外部 api 在 Apache Beam 数据流中调用
external api call in apache beam dataflow
我有一个用例,我读取存储在 google 云存储中的换行符 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用一个外部 API 来进行重复数据删除,无论该 json 元素之前是否被发现。我正在做一个 ParDo
,每个 json 上有一个 DoFn
。
我还没有看到任何在线教程说明如何从 apache beam DoFn
Dataflow 调用外部 API 端点。
我正在使用 Beam 的 JAVA
SDK。我学习的一些教程解释说使用 startBundle
和 FinishBundle
但我不清楚如何使用它
下面的博客 post 中有一个使用有状态 DoFn 批量调用外部系统的示例:https://beam.apache.org/blog/2017/08/28/timely-processing.html,可能会有帮助。
如果您需要为每个 JSON 记录检查外部存储中的重复项,那么您仍然可以使用 DoFn
。有几个注释,如 @Setup
、@StartBundle
、@FinishBundle
等,可用于注释 DoFn
中的方法。
例如,如果您需要实例化一个客户端对象来向您的外部数据库发送请求,那么您可能希望在 @Setup
方法(如 POJO 构造函数)中执行此操作,然后在你的 @ProcessElement
方法。
让我们考虑一个简单的例子:
static class MyDoFn extends DoFn<Record, Record> {
static transient MyClient client;
@Setup
public void setup() {
client = new MyClient("host");
}
@ProcessElement
public void processElement(ProcessContext c) {
// process your records
Record r = c.element();
// check record ID for duplicates
if (!client.isRecordExist(r.id()) {
c.output(r);
}
}
@Teardown
public void teardown() {
if (client != null) {
client.close();
client = null;
}
}
}
此外,为了避免对每条记录进行远程调用,您可以将记录批处理打包到内部缓冲区(Beam 将输入数据拆分成包)并以批处理模式检查重复项(如果您的客户端支持此功能)。为此,您可以使用 @StartBundle
和 @FinishBundle
注释方法,这些方法将在相应地处理 Beam bundle 之前和之后调用。
对于更复杂的示例,我建议查看不同 Beam IOs 中的 Sink 实现,例如 KinesisIO。
我有一个用例,我读取存储在 google 云存储中的换行符 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用一个外部 API 来进行重复数据删除,无论该 json 元素之前是否被发现。我正在做一个 ParDo
,每个 json 上有一个 DoFn
。
我还没有看到任何在线教程说明如何从 apache beam DoFn
Dataflow 调用外部 API 端点。
我正在使用 Beam 的 JAVA
SDK。我学习的一些教程解释说使用 startBundle
和 FinishBundle
但我不清楚如何使用它
下面的博客 post 中有一个使用有状态 DoFn 批量调用外部系统的示例:https://beam.apache.org/blog/2017/08/28/timely-processing.html,可能会有帮助。
如果您需要为每个 JSON 记录检查外部存储中的重复项,那么您仍然可以使用 DoFn
。有几个注释,如 @Setup
、@StartBundle
、@FinishBundle
等,可用于注释 DoFn
中的方法。
例如,如果您需要实例化一个客户端对象来向您的外部数据库发送请求,那么您可能希望在 @Setup
方法(如 POJO 构造函数)中执行此操作,然后在你的 @ProcessElement
方法。
让我们考虑一个简单的例子:
static class MyDoFn extends DoFn<Record, Record> {
static transient MyClient client;
@Setup
public void setup() {
client = new MyClient("host");
}
@ProcessElement
public void processElement(ProcessContext c) {
// process your records
Record r = c.element();
// check record ID for duplicates
if (!client.isRecordExist(r.id()) {
c.output(r);
}
}
@Teardown
public void teardown() {
if (client != null) {
client.close();
client = null;
}
}
}
此外,为了避免对每条记录进行远程调用,您可以将记录批处理打包到内部缓冲区(Beam 将输入数据拆分成包)并以批处理模式检查重复项(如果您的客户端支持此功能)。为此,您可以使用 @StartBundle
和 @FinishBundle
注释方法,这些方法将在相应地处理 Beam bundle 之前和之后调用。
对于更复杂的示例,我建议查看不同 Beam IOs 中的 Sink 实现,例如 KinesisIO。