ReadFromPubSub->CloudStorage->BigQuery:订阅的大小从未减少,似乎只有 0.002% 到达 BigQuery
ReadFromPubSub->CloudStorage->BigQuery: Subscription never decreases in size and only 0.002% seemingly arrive in BigQuery
管道正在使用 ReadFromPubSub
源读取云存储 blob 的链接,读取存储在每个文件中的事件,然后将它们插入 BigQuery:
with beam.Pipeline(options=pipeline_options) as pipeline:
dlq = DeadletterQueue(known_args.output_dlq)
pipeline = (
pipeline
| "Read PubSub Messages"
>> beam.io.ReadFromPubSub(
topic=topic,
id_label="messageId",
)
| "Read Records" >> ReadCloudStorageEvents(deadletter_queue=dlq)
| "Parse Events" >> ParseEventRecords(deadletter_queue=dlq)
| "window events" >> WindowOnTimeAndSize(60, 10)
| "Upload To BigQuery" >> BigQuerySink(project, deadletter_queue=dlq)
)
问题是,即使从 PubSub 以极快的速度消耗项目,并且以同样快的速度从云存储读取文件,它们也不会以接近任何速度插入 BigQuery bigquery 流限制。
数据新鲜度和系统延迟不断攀升:.
这样做的副作用是不会删除队列项目。
BigQuerySink
本质上是这样的:
class BigQuerySink(beam.PTransform):
def __init__(self, project: str, deadletter_queue: beam.PTransform):
self.deadletter_queue = deadletter_queue
self.project = project
def expand(self, pcoll):
def yield_from(events: Iterable[Dict]) -> Iterable[Dict]:
for event in events:
yield event
pcoll = (
pcoll
| "flatten events" >> beam.FlatMap(yield_from)
| f"push events to BigQuery"
>> beam.io.WriteToBigQuery(
table=lambda event: f"{self.project}:events_v2.{event['type']}",
schema=lambda table: load_schema(table.split(".")[-1]),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
validate=True,
additional_bq_parameters={
"clustering": {"fields": ["accountId"]},
"timePartitioning": {"type": "DAY", "field": "receivedAt"},
},
)
)
# #
pcoll[
beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS
] | "Map to Error" >> beam.Map(
lambda x: Error(
message=f"BigQuery exception",
record={"destination": x[0], "index": x[1]},
data=None,
stacktrace=None,
)
| self.deadletter_queue
)
使用 200.000 个元素进行测试,引用总共包含大约 10,000,000 个事件的文件导致只有大约 0.002% 的事件进入 BigQuery。我们离配额不远了,我没有看到任何错误或任何东西(当字段与架构不匹配时,有时我会看到一些错误,但我什么也没看到)。
任何关于确定哪里出了问题的见解都将非常受欢迎。有没有什么地方可以查看这些项目是否在 BigQuery 端未通过某些验证,因此没有在订阅中被删除或?
似乎是瓶颈的步骤是 _StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)
,如下图所示。接近 1000 MB 进入该步骤,只有 5 MB 退出(如果我正确阅读图表):
更新:WindowOnTimeAndSize
class WindowOnTimeAndSize(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size_seconds, after_count):
# Convert minutes into seconds.
self.window_size = int(window_size_seconds)
self.after_count = after_count
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| "Window into Fixed Intervals"
>> beam.WindowInto(
beam.window.FixedWindows(self.window_size),
trigger=Repeatedly(
AfterAny(
AfterCount(self.after_count),
AfterProcessingTime(self.window_size),
)
),
accumulation_mode=AccumulationMode.DISCARDING,
)
)
您最好的资源可能是数据流监控控制台,特别是当 viewing the pipeline 您可以单击各个步骤以确定是哪个步骤导致了延迟。不要忘记,可以通过单击小 v 形符号向下钻取到有问题的部分来展开复合变换。
对 WriteToBigQuery 进行了一些更改,导致转换速度非常慢。在 Beam 2.24.0(几周后即将推出)中,转换应该能够达到更高的性能(我已经测试了每个 CPU ~500-600 EPS)。
抱歉给您带来麻烦。您今天可以使用 head 上的代码,或者等待几周才能获得 Beam 2.24.0。
管道正在使用 ReadFromPubSub
源读取云存储 blob 的链接,读取存储在每个文件中的事件,然后将它们插入 BigQuery:
with beam.Pipeline(options=pipeline_options) as pipeline:
dlq = DeadletterQueue(known_args.output_dlq)
pipeline = (
pipeline
| "Read PubSub Messages"
>> beam.io.ReadFromPubSub(
topic=topic,
id_label="messageId",
)
| "Read Records" >> ReadCloudStorageEvents(deadletter_queue=dlq)
| "Parse Events" >> ParseEventRecords(deadletter_queue=dlq)
| "window events" >> WindowOnTimeAndSize(60, 10)
| "Upload To BigQuery" >> BigQuerySink(project, deadletter_queue=dlq)
)
问题是,即使从 PubSub 以极快的速度消耗项目,并且以同样快的速度从云存储读取文件,它们也不会以接近任何速度插入 BigQuery bigquery 流限制。
数据新鲜度和系统延迟不断攀升:
这样做的副作用是不会删除队列项目。
BigQuerySink
本质上是这样的:
class BigQuerySink(beam.PTransform):
def __init__(self, project: str, deadletter_queue: beam.PTransform):
self.deadletter_queue = deadletter_queue
self.project = project
def expand(self, pcoll):
def yield_from(events: Iterable[Dict]) -> Iterable[Dict]:
for event in events:
yield event
pcoll = (
pcoll
| "flatten events" >> beam.FlatMap(yield_from)
| f"push events to BigQuery"
>> beam.io.WriteToBigQuery(
table=lambda event: f"{self.project}:events_v2.{event['type']}",
schema=lambda table: load_schema(table.split(".")[-1]),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
validate=True,
additional_bq_parameters={
"clustering": {"fields": ["accountId"]},
"timePartitioning": {"type": "DAY", "field": "receivedAt"},
},
)
)
# #
pcoll[
beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS
] | "Map to Error" >> beam.Map(
lambda x: Error(
message=f"BigQuery exception",
record={"destination": x[0], "index": x[1]},
data=None,
stacktrace=None,
)
| self.deadletter_queue
)
使用 200.000 个元素进行测试,引用总共包含大约 10,000,000 个事件的文件导致只有大约 0.002% 的事件进入 BigQuery。我们离配额不远了,我没有看到任何错误或任何东西(当字段与架构不匹配时,有时我会看到一些错误,但我什么也没看到)。
任何关于确定哪里出了问题的见解都将非常受欢迎。有没有什么地方可以查看这些项目是否在 BigQuery 端未通过某些验证,因此没有在订阅中被删除或?
似乎是瓶颈的步骤是 _StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)
,如下图所示。接近 1000 MB 进入该步骤,只有 5 MB 退出(如果我正确阅读图表):
更新:WindowOnTimeAndSize
class WindowOnTimeAndSize(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size_seconds, after_count):
# Convert minutes into seconds.
self.window_size = int(window_size_seconds)
self.after_count = after_count
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| "Window into Fixed Intervals"
>> beam.WindowInto(
beam.window.FixedWindows(self.window_size),
trigger=Repeatedly(
AfterAny(
AfterCount(self.after_count),
AfterProcessingTime(self.window_size),
)
),
accumulation_mode=AccumulationMode.DISCARDING,
)
)
您最好的资源可能是数据流监控控制台,特别是当 viewing the pipeline 您可以单击各个步骤以确定是哪个步骤导致了延迟。不要忘记,可以通过单击小 v 形符号向下钻取到有问题的部分来展开复合变换。
对 WriteToBigQuery 进行了一些更改,导致转换速度非常慢。在 Beam 2.24.0(几周后即将推出)中,转换应该能够达到更高的性能(我已经测试了每个 CPU ~500-600 EPS)。
抱歉给您带来麻烦。您今天可以使用 head 上的代码,或者等待几周才能获得 Beam 2.24.0。