为什么自定义 Python 对象不能与 ParDo Fn 一起使用?
Why does custom Python object cannot be used with ParDo Fn?
我目前不熟悉在 Python 中将 Apache Beam 与数据流运行器一起使用。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我修改了 Beam Python API 并找到了解决方案。然而,在探索过程中,我遇到了一些有趣的问题,这让我很好奇。
1。成功的管道
目前,我成功的从 GCS 批量发布数据的 Beam 管道如下所示:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)
2。不成功的管道
在这里,我试图让发布者共享 DoFn
。我曾尝试过以下方法。
一个。在 DoFn
中初始化发布者
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
... ## same as 1
b.在 DoFn 之外初始化 Publisher,并将其传递给 DoFn
class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
.... ## same as 1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)
使发布者在 DoFn
方法之间共享的两次尝试均失败,并显示以下错误消息:
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
和
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
我的问题是:
共享发布器的实现是否会提高波束管道性能?如果是,那么我想探索这个解决方案。
为什么我的失败管道会出现错误?是由于初始化自定义 class 对象并将其传递给 process
函数之外的 DoFn 吗?如果是因为这个原因,我该如何实现一个管道,以便我能够在 DoFn 中重用自定义对象?
谢谢,非常感谢您的帮助。
编辑:解决方案
好的,Ankur 已经解释了我的问题出现的原因,并讨论了如何在 DoFn 上完成序列化。基于这些知识,我现在了解到在 DoFn 中制作自定义对象 shared/reusable 有两种解决方案:
使自定义对象可序列化:这允许对象在 DoFn 对象创建期间成为 initialized/available(在 __init__
下)。该对象必须是可序列化的,因为它将在管道提交期间被序列化,在管道提交期间将创建 DoFn 对象(调用 __init__
)。我的回答在下面回答了如何实现这一点。此外,我发现此要求实际上与 [1][2].
下的 Beam 文档相关联
在 __init__
之外的 DoFn 函数中初始化不可序列化的对象以避免序列化,因为在管道提交期间不会调用 init 之外的函数。 Ankur 的回答中解释了如何完成此操作。
参考文献:
[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
PublisherClient
无法正确腌制。更多关于酸洗 here。
在process
方法中初始化PublisherClient
避免了PublisherClient
.
的酸洗
如果打算重用 PublisherClient
,我建议在 process 方法中初始化 PublisherClient
并使用以下代码将其存储在 self
中。
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
感谢Ankur,我发现这个问题是由于python中的酸洗问题。然后,我尝试通过先解决 pickling PublisherClient
的问题来隔离问题,并找到了在 Beam 上跨 DoFn
共享 PublisherClient
的解决方案。
在 python 中,我们可以使用 dill
包来 pickle 自定义对象,我意识到这个包已经在 Beam python 实现中用于 pickle 对象。所以我尝试排查问题,发现这个错误:
TypeError: no default __reduce__ due to non-trivial __cinit__
然后,我尝试修复这个错误,我的管道现在可以工作了!
解决方法如下:
class PubsubClient(PublisherClient):
def __reduce__(self):
return self.__class__, (self.batch_settings,)
# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = PubsubClient(batch_settings=batch_settings)
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))
return future.result()
# ...the run_gcs_to_pubsub is the same as my successful pipeline
解决方案是这样的:首先,我从 PublisherClient
继承并自己实现 __reduce__
函数。请注意,因为我只使用 batch_settings
属性 来初始化我的 PublisherClient
,所以这个 属性 足以满足我的 __reduce__
功能。然后,我在 __init__
中将此修改后的 PublisherClient
用于我的 DoFn。
希望通过这个新的解决方案,我的管道能够获得性能改进。
我目前不熟悉在 Python 中将 Apache Beam 与数据流运行器一起使用。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我修改了 Beam Python API 并找到了解决方案。然而,在探索过程中,我遇到了一些有趣的问题,这让我很好奇。
1。成功的管道
目前,我成功的从 GCS 批量发布数据的 Beam 管道如下所示:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)
2。不成功的管道
在这里,我试图让发布者共享 DoFn
。我曾尝试过以下方法。
一个。在 DoFn
中初始化发布者class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
... ## same as 1
b.在 DoFn 之外初始化 Publisher,并将其传递给 DoFn
class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
.... ## same as 1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)
使发布者在 DoFn
方法之间共享的两次尝试均失败,并显示以下错误消息:
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
和
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
我的问题是:
共享发布器的实现是否会提高波束管道性能?如果是,那么我想探索这个解决方案。
为什么我的失败管道会出现错误?是由于初始化自定义 class 对象并将其传递给
process
函数之外的 DoFn 吗?如果是因为这个原因,我该如何实现一个管道,以便我能够在 DoFn 中重用自定义对象?
谢谢,非常感谢您的帮助。
编辑:解决方案
好的,Ankur 已经解释了我的问题出现的原因,并讨论了如何在 DoFn 上完成序列化。基于这些知识,我现在了解到在 DoFn 中制作自定义对象 shared/reusable 有两种解决方案:
使自定义对象可序列化:这允许对象在 DoFn 对象创建期间成为 initialized/available(在
__init__
下)。该对象必须是可序列化的,因为它将在管道提交期间被序列化,在管道提交期间将创建 DoFn 对象(调用__init__
)。我的回答在下面回答了如何实现这一点。此外,我发现此要求实际上与 [1][2]. 下的 Beam 文档相关联
在
__init__
之外的 DoFn 函数中初始化不可序列化的对象以避免序列化,因为在管道提交期间不会调用 init 之外的函数。 Ankur 的回答中解释了如何完成此操作。
参考文献:
[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
PublisherClient
无法正确腌制。更多关于酸洗 here。
在process
方法中初始化PublisherClient
避免了PublisherClient
.
如果打算重用 PublisherClient
,我建议在 process 方法中初始化 PublisherClient
并使用以下代码将其存储在 self
中。
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
感谢Ankur,我发现这个问题是由于python中的酸洗问题。然后,我尝试通过先解决 pickling PublisherClient
的问题来隔离问题,并找到了在 Beam 上跨 DoFn
共享 PublisherClient
的解决方案。
在 python 中,我们可以使用 dill
包来 pickle 自定义对象,我意识到这个包已经在 Beam python 实现中用于 pickle 对象。所以我尝试排查问题,发现这个错误:
TypeError: no default __reduce__ due to non-trivial __cinit__
然后,我尝试修复这个错误,我的管道现在可以工作了!
解决方法如下:
class PubsubClient(PublisherClient):
def __reduce__(self):
return self.__class__, (self.batch_settings,)
# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = PubsubClient(batch_settings=batch_settings)
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))
return future.result()
# ...the run_gcs_to_pubsub is the same as my successful pipeline
解决方案是这样的:首先,我从 PublisherClient
继承并自己实现 __reduce__
函数。请注意,因为我只使用 batch_settings
属性 来初始化我的 PublisherClient
,所以这个 属性 足以满足我的 __reduce__
功能。然后,我在 __init__
中将此修改后的 PublisherClient
用于我的 DoFn。
希望通过这个新的解决方案,我的管道能够获得性能改进。