ReadFromPubSub->CloudStorage->BigQuery:订阅的大小从未减少,似乎只有 0.002% 到达 BigQuery

ReadFromPubSub->CloudStorage->BigQuery: Subscription never decreases in size and only 0.002% seemingly arrive in BigQuery

管道正在使用 ReadFromPubSub 源读取云存储 blob 的链接,读取存储在每个文件中的事件,然后将它们插入 BigQuery:

with beam.Pipeline(options=pipeline_options) as pipeline:
    dlq = DeadletterQueue(known_args.output_dlq)

    pipeline = (
        pipeline
        | "Read PubSub Messages"
        >> beam.io.ReadFromPubSub(
            topic=topic,
            id_label="messageId",
        )
        | "Read Records" >> ReadCloudStorageEvents(deadletter_queue=dlq)
        | "Parse Events" >> ParseEventRecords(deadletter_queue=dlq)
        | "window events" >> WindowOnTimeAndSize(60, 10)
        | "Upload To BigQuery" >> BigQuerySink(project, deadletter_queue=dlq)
    )

问题是,即使从 PubSub 以极快的速度消耗项目,并且以同样快的速度从云存储读取文件,它们也不会以接近任何速度插入 BigQuery bigquery 流限制。

数据新鲜度和系统延迟不断攀升:.

这样做的副作用是不会删除队列项目。

BigQuerySink 本质上是这样的:

class BigQuerySink(beam.PTransform):
    def __init__(self, project: str, deadletter_queue: beam.PTransform):
        self.deadletter_queue = deadletter_queue
        self.project = project

    def expand(self, pcoll):
        def yield_from(events: Iterable[Dict]) -> Iterable[Dict]:
            for event in events:
                yield event

        pcoll = (
            pcoll
            | "flatten events" >> beam.FlatMap(yield_from)
            | f"push events to BigQuery"
            >> beam.io.WriteToBigQuery(
                table=lambda event: f"{self.project}:events_v2.{event['type']}",
                schema=lambda table: load_schema(table.split(".")[-1]),
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
                validate=True,
                additional_bq_parameters={
                    "clustering": {"fields": ["accountId"]},
                    "timePartitioning": {"type": "DAY", "field": "receivedAt"},
                },
            )
        )

        # # 
        pcoll[
            beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS
        ] | "Map to Error" >> beam.Map(
            lambda x: Error(
                message=f"BigQuery exception",
                record={"destination": x[0], "index": x[1]},
                data=None,
                stacktrace=None,
            )
            | self.deadletter_queue
        )

使用 200.000 个元素进行测试,引用总共包含大约 10,000,000 个事件的文件导致只有大约 0.002% 的事件进入 BigQuery。我们离配额不远了,我没有看到任何错误或任何东西(当字段与架构不匹配时,有时我会看到一些错误,但我什么也没看到)。

任何关于确定哪里出了问题的见解都将非常受欢迎。有没有什么地方可以查看这些项目是否在 BigQuery 端未通过某些验证,因此没有在订阅中被删除或?

似乎是瓶颈的步骤是 _StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps),如下图所示。接近 1000 MB 进入该步骤,只有 5 MB 退出(如果我正确阅读图表):

更新:WindowOnTimeAndSize

class WindowOnTimeAndSize(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size_seconds, after_count):
        # Convert minutes into seconds.
        self.window_size = int(window_size_seconds)
        self.after_count = after_count

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(
                beam.window.FixedWindows(self.window_size),
                trigger=Repeatedly(
                    AfterAny(
                        AfterCount(self.after_count),
                        AfterProcessingTime(self.window_size),
                    )
                ),
                accumulation_mode=AccumulationMode.DISCARDING,
            )
        )

您最好的资源可能是数据流监控控制台,特别是当 viewing the pipeline 您可以单击各个步骤以确定是哪个步骤导致了延迟。不要忘记,可以通过单击小 v 形符号向下钻取到有问题的部分来展开复合变换。

对 WriteToBigQuery 进行了一些更改,导致转换速度非常慢。在 Beam 2.24.0(几周后即将推出)中,转换应该能够达到更高的性能(我已经测试了每个 CPU ~500-600 EPS)。

抱歉给您带来麻烦。您今天可以使用 head 上的代码,或者等待几周才能获得 Beam 2.24.0。