如何使用 Apache Beam 传播 PubSub 元数据?
How to propagate PubSub metadata with Apache Beam?
上下文:我有一个监听 pub sub 的管道,发送到 pubsub 的消息是由来自 google 云存储的对象更改通知发布的。管道使用 XmlIO 拆分文件来处理文件,到目前为止一切顺利。
问题是:在 pubsub 消息中(以及存储在 google 云存储中的对象中)我有一些元数据,我想将其与 XmlIO 中的数据合并以组成元素管道将处理,我该如何实现?
您可以直接使用来自Google Cloud Storage 的pub/sub 通知,而不是在中间引入OCN。
Google也建议使用pub/sub。如果您收到 pub/sub 通知,您可以在其中获取消息属性。
data = request.get_json()
object_id = data['message']['attributes']['objectGeneration']
bucket_name = data['message']['attributes']['bucketId']
object_name = data['message']['attributes']['objectId']
您可以创建一个自定义 window 和 windowfn 来存储来自 pubsub 消息的元数据,您以后要使用这些元数据来丰富各个记录。
您的管道将如下所示:
ReadFromPubsub -> Window.into(CopyMetadataToCustomWindowFn) -> ParDo(ExtractFilenameFromPubsubMessage) -> XmlIO -> ParDo(EnrichRecordsWithWindowMetadata) -> Window.into(FixedWindows.of(...))
首先,您需要创建 IntervalWindow that stores the metadata that you need. After that, create a subclass of WindowFn where in #assignWindows(...) you copy the metadata from the pubsub message into the IntervalWindow subclass you created. Apply your new windowfn using the Window.into(...) 转换的子类。现在,流经 XmlIO 转换的每条记录都将位于包含元数据的自定义 windowfn 中。
对于第二步,您需要从 pubsub 消息中提取相关文件名以作为输入传递给 XmlIO 转换。
对于第三步,您想要从 XmlIO 之后的 ParDo/DoFn 中的 window 中提取自定义元数据。 XmlIO 中的记录将保留通过它传递的 windowing 信息(请注意,并非所有转换都这样做,但几乎所有转换都这样做)。您可以声明您的 DoFn needs the window to be passed to your @ProcessElement,例如:
class EnrichRecordsWithWindowMetadata extends DoFn<...> {
@ProcessElement
public void processElement(@Element XmlRecord xmlRecord, MyCustomMetadataWindow metadataWindow) {
... enrich record with metadata on window ...
}
}
最后,最好恢复到标准 windowfns 之一,例如 FixedWindows,因为 window 上的元数据不再相关。
上下文:我有一个监听 pub sub 的管道,发送到 pubsub 的消息是由来自 google 云存储的对象更改通知发布的。管道使用 XmlIO 拆分文件来处理文件,到目前为止一切顺利。
问题是:在 pubsub 消息中(以及存储在 google 云存储中的对象中)我有一些元数据,我想将其与 XmlIO 中的数据合并以组成元素管道将处理,我该如何实现?
您可以直接使用来自Google Cloud Storage 的pub/sub 通知,而不是在中间引入OCN。
Google也建议使用pub/sub。如果您收到 pub/sub 通知,您可以在其中获取消息属性。
data = request.get_json()
object_id = data['message']['attributes']['objectGeneration']
bucket_name = data['message']['attributes']['bucketId']
object_name = data['message']['attributes']['objectId']
您可以创建一个自定义 window 和 windowfn 来存储来自 pubsub 消息的元数据,您以后要使用这些元数据来丰富各个记录。
您的管道将如下所示:
ReadFromPubsub -> Window.into(CopyMetadataToCustomWindowFn) -> ParDo(ExtractFilenameFromPubsubMessage) -> XmlIO -> ParDo(EnrichRecordsWithWindowMetadata) -> Window.into(FixedWindows.of(...))
首先,您需要创建 IntervalWindow that stores the metadata that you need. After that, create a subclass of WindowFn where in #assignWindows(...) you copy the metadata from the pubsub message into the IntervalWindow subclass you created. Apply your new windowfn using the Window.into(...) 转换的子类。现在,流经 XmlIO 转换的每条记录都将位于包含元数据的自定义 windowfn 中。
对于第二步,您需要从 pubsub 消息中提取相关文件名以作为输入传递给 XmlIO 转换。
对于第三步,您想要从 XmlIO 之后的 ParDo/DoFn 中的 window 中提取自定义元数据。 XmlIO 中的记录将保留通过它传递的 windowing 信息(请注意,并非所有转换都这样做,但几乎所有转换都这样做)。您可以声明您的 DoFn needs the window to be passed to your @ProcessElement,例如:
class EnrichRecordsWithWindowMetadata extends DoFn<...> {
@ProcessElement
public void processElement(@Element XmlRecord xmlRecord, MyCustomMetadataWindow metadataWindow) {
... enrich record with metadata on window ...
}
}
最后,最好恢复到标准 windowfns 之一,例如 FixedWindows,因为 window 上的元数据不再相关。