如何在 python beam 中制作通用的 Protobuf Parser DoFn?
How to make a generic Protobuf Parser DoFn in python beam?
上下文
我正在使用在 pubsub 中具有 protobuf 数据源的流式传输管道。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。我通过在 DoFn 的 process
函数中初始化 protobuf 消息成功开发了 Protobuf 解析器。
为什么需要通用 Protobuf 解析器
但是,我想知道,是否可以在 Beam 上制作一个通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 很有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们可以使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于 Python 函数是 first-class 对象,我在考虑是否可以将 Protobuf 模式 class (不是消息实例对象)传递到 DoFn 中。我尝试这样做,但我一直失败。
成功的解析器,但警告:不可推广
下面是我目前成功的 protobuf 解析器。 protobuf 消息在 process
函数中初始化。
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
虽然上述 Protobuf DoFn 解析器可以正常工作,但它并未推广到所有 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现新的 DoFn 解析器。
我的尝试
为了使解析器对所有 protobuf 模式通用,我尝试将 protobuf 模式(在 Python 中作为 class 生成)传递给 DoFn。
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
和其他类似技术,但是,我的所有尝试都失败了,并显示相同的错误消息:
pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
从这个错误消息中,我对问题发生的原因有两个假设:
Protobuf 架构 class 不可序列化。然而,这个假设可能是错误的,因为虽然我知道 pickle
无法序列化 protobuf 模式,但如果我使用 dill
,我能够序列化 protobuf 模式。但是除此之外,我仍然有点不确定 python beam 中的 DoFn 是如何实现序列化的(例如:当它使用 dill
或 pickle
来序列化时,序列化格式是什么使其可序列化并与 DoFn 等兼容的对象)
DoFn 中的导入错误 class。由于 function/class 范围和数据流工作人员,我遇到了 python beam 的几个导入错误问题,为了解决这个问题,我不得不在需要它的函数中本地导入包,而不是在模块中全局导入包.所以也许,如果我们将 protobuf 模式 class 传递给 DoFn,模式导入实际上是在 DoFn 之外完成的,因此 DoFn 无法正确解析 DoFn 内部的 class 名称?
我的问题是:
- 为什么会出现这个错误,我该如何解决这个错误?
- 是否可以传递 protobuf 架构 class?或者是否有更好的方法来实现 python dict 解析器 DoFn 的通用 protobuf 而无需将 protobuf 模式 class 传递给 DoFn?
- Python 中的 DoFn 如何工作,我如何确保传递给 DoFn 创建的对象 (
__init__
) 是可序列化的? beam 上是否有我可以继承的可序列化 class,以便我可以将不可序列化的对象转换为可序列化的对象?
非常感谢!非常感谢您的帮助。
我实际上找到了一个替代解决方案,用于使用 beam.Map
创建通用 Protobuf 解析器
def convert_proto_to_dict(data, schema_class):
message = schema_class()
if isinstance(data, (str, bytes)):
message.ParseFromString(data)
else:
message = data
return MessageToDict(message, preserving_proto_field_name=True)
def run_pubsub_to_gbq_pipeline(argv):
... options initialization
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
所以首先,我创建了一个函数,它接收一个 protobuf 模式 class 和 protobuf 数据(当前以字节字符串形式)作为参数。该函数会将字符串字节数据初始化并解析为protobuf消息,并将protobuf消息转换为python字典。
此函数随后被 beam.Map
使用,所以现在我能够在没有 beam.DoFn
的情况下在 beam 上开发一个通用的 Protobuf 解析器。但是,我仍然很好奇为什么 protobuf 架构 class 在与 DoFn 一起使用时会出现问题,所以如果您知道为什么以及如何解决这个问题,请在这里分享您的答案,谢谢!
上下文
我正在使用在 pubsub 中具有 protobuf 数据源的流式传输管道。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。我通过在 DoFn 的 process
函数中初始化 protobuf 消息成功开发了 Protobuf 解析器。
为什么需要通用 Protobuf 解析器
但是,我想知道,是否可以在 Beam 上制作一个通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 很有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们可以使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于 Python 函数是 first-class 对象,我在考虑是否可以将 Protobuf 模式 class (不是消息实例对象)传递到 DoFn 中。我尝试这样做,但我一直失败。
成功的解析器,但警告:不可推广
下面是我目前成功的 protobuf 解析器。 protobuf 消息在 process
函数中初始化。
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
虽然上述 Protobuf DoFn 解析器可以正常工作,但它并未推广到所有 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现新的 DoFn 解析器。
我的尝试
为了使解析器对所有 protobuf 模式通用,我尝试将 protobuf 模式(在 Python 中作为 class 生成)传递给 DoFn。
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
和其他类似技术,但是,我的所有尝试都失败了,并显示相同的错误消息:
pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
从这个错误消息中,我对问题发生的原因有两个假设:
Protobuf 架构 class 不可序列化。然而,这个假设可能是错误的,因为虽然我知道
pickle
无法序列化 protobuf 模式,但如果我使用dill
,我能够序列化 protobuf 模式。但是除此之外,我仍然有点不确定 python beam 中的 DoFn 是如何实现序列化的(例如:当它使用dill
或pickle
来序列化时,序列化格式是什么使其可序列化并与 DoFn 等兼容的对象)DoFn 中的导入错误 class。由于 function/class 范围和数据流工作人员,我遇到了 python beam 的几个导入错误问题,为了解决这个问题,我不得不在需要它的函数中本地导入包,而不是在模块中全局导入包.所以也许,如果我们将 protobuf 模式 class 传递给 DoFn,模式导入实际上是在 DoFn 之外完成的,因此 DoFn 无法正确解析 DoFn 内部的 class 名称?
我的问题是:
- 为什么会出现这个错误,我该如何解决这个错误?
- 是否可以传递 protobuf 架构 class?或者是否有更好的方法来实现 python dict 解析器 DoFn 的通用 protobuf 而无需将 protobuf 模式 class 传递给 DoFn?
- Python 中的 DoFn 如何工作,我如何确保传递给 DoFn 创建的对象 (
__init__
) 是可序列化的? beam 上是否有我可以继承的可序列化 class,以便我可以将不可序列化的对象转换为可序列化的对象?
非常感谢!非常感谢您的帮助。
我实际上找到了一个替代解决方案,用于使用 beam.Map
def convert_proto_to_dict(data, schema_class):
message = schema_class()
if isinstance(data, (str, bytes)):
message.ParseFromString(data)
else:
message = data
return MessageToDict(message, preserving_proto_field_name=True)
def run_pubsub_to_gbq_pipeline(argv):
... options initialization
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
所以首先,我创建了一个函数,它接收一个 protobuf 模式 class 和 protobuf 数据(当前以字节字符串形式)作为参数。该函数会将字符串字节数据初始化并解析为protobuf消息,并将protobuf消息转换为python字典。
此函数随后被 beam.Map
使用,所以现在我能够在没有 beam.DoFn
的情况下在 beam 上开发一个通用的 Protobuf 解析器。但是,我仍然很好奇为什么 protobuf 架构 class 在与 DoFn 一起使用时会出现问题,所以如果您知道为什么以及如何解决这个问题,请在这里分享您的答案,谢谢!