如何从 Dataflow 批量(高效)发布到 Pub/Sub?

How to publish to Pub/Sub from Dataflow in batch (efficiently)?

由于批处理模式下的数据流作业,我想将消息发布到具有某些属性的 Pub/Sub 主题。

我的数据流管道是用 python 3.8 和 apache-beam 2.27.0

写入的

它与这里的@Ankur 解决方案一起工作:

但我认为使用共享 Pub/Sub 客户端可能会更有效率:

但是发生错误:

return StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute 'PublishFn' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.8/site-packages/dataflow_worker/start.py'>

问题:

  1. 共享发布者实施是否会提高波束管道性能?
  2. 是否有另一种方法可以避免我的共享发布者客户端出现 pickling 错误?

我的数据流管道:

import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from google.cloud.pubsub_v1 import PublisherClient

import json
import argparse
import re
import logging


class PubsubClient(PublisherClient):
    def __reduce__(self):
        return self.__class__, (self.batch_settings,)


# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PubsubClient(batch_settings)
        super().__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(
            topic=element["topic"],
            data=json.dumps(element["data"]).encode("utf-8"),
            **element["attributes"],
        )

        return future.result()


def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--source_table_id",
        dest="source_table_id",
        default="",
        help="BigQuery source table <project>.<dataset>.<table> with columns (topic, attributes, data)",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).

    pipeline_options = PipelineOptions(pipeline_args)
    # pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    bq_source_table = known_args.source_table_id
    bq_table_regex = r"^(?P<PROJECT_ID>[a-zA-Z0-9_-]*)[\.|\:](?P<DATASET_ID>[a-zA-Z0-9_]*)\.(?P<TABLE_ID>[a-zA-Z0-9_-]*)$"

    regex_match = re.search(bq_table_regex, bq_source_table)

    if not regex_match:
        raise ValueError(
            f"Bad BigQuery table id : `{bq_source_table}` please match {bq_table_regex}"
        )

    table_ref = bigquery.TableReference(
        projectId=regex_match.group("PROJECT_ID"),
        datasetId=regex_match.group("DATASET_ID"),
        tableId=regex_match.group("TABLE_ID"),
    )

    with beam.Pipeline(options=pipeline_options) as p:

        (
            p
            | "ReadFromBqTable" # 
            >> bigquery.ReadFromBigQuery(table=table_ref, use_json_exports=True) # Each row contains : topic / attributes / data
            | "PublishRowsToPubSub" >> beam.ParDo(PublishFn())
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

经过一番纠结之后,我想我有一个始终如一的答案,即使不是世界一流的性能,至少也可以使用:

import logging

import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import (
    BatchSettings,
    LimitExceededBehavior,
    PublishFlowControl,
    PublisherOptions,
)


class PublishClient(PublisherClient):
    """
    You have to override __reduce__ to make PublisherClient pickleable   

    Props to 'Ankur' and 'Benjamin' on SO for figuring this part out; god knows
    I would not have...
    """

    def __reduce__(self):
        return self.__class__, (self.batch_settings, self.publisher_options)


class PubsubWriter(beam.DoFn):
    """
    beam.io.gcp.pubsub does not yet support batch operations, so
    we do this the hard way.  it's not as performant as the native
    pubsubio but it does the job.
    """

    def __init__(self, topic: str):
        self.topic = topic
        self.window = beam.window.GlobalWindow()
        self.count = 0

        batch_settings = BatchSettings(
            max_bytes=1e6,  # 1MB
            # by default it is 10 ms, should be less than timeout used in future.result() to avoid timeout
            max_latency=1,
        )

        publisher_options = PublisherOptions(
            enable_message_ordering=False,
            # better to be slow than to drop messages during a recovery...
            flow_control=PublishFlowControl(limit_exceeded_behavior=LimitExceededBehavior.BLOCK),
        )

        self.publisher = PublishClient(batch_settings, publisher_options)

    def start_bundle(self):
        self.futures = []

    def process(self, element: PubsubMessage, window=beam.DoFn.WindowParam):
        self.window = window
        self.futures.append(
            self.publisher.publish(
                topic=self.topic,
                data=element.data,
                **element.attributes,
            )
        )

    def finish_bundle(self):
        """Iterate over the list of async publish results and block
        until all of them have either succeeded or timed out.  Yield
        a WindowedValue of the success/fail counts."""

        results = []
        self.count = self.count + len(self.futures)
        for fut in self.futures:
            try:
                # future.result() blocks until success or timeout;
                # we've set a max_latency of 60s upstairs in BatchSettings,
                # so we should never spend much time waiting here.
                results.append(fut.result(timeout=60))
            except Exception as ex:
                results.append(ex)

        res_count = {"success": 0}
        for res in results:
            if isinstance(res, str):
                res_count["success"] += 1
            else:
                # if it's not a string, it's an exception
                msg = str(res)
                if msg not in res_count:
                    res_count[msg] = 1
                else:
                    res_count[msg] += 1

        logging.info(f"Pubsub publish results: {res_count}")

        yield beam.utils.windowed_value.WindowedValue(
            value=res_count,
            timestamp=0,
            windows=[self.window],
        )

    def teardown(self):
        logging.info(f"Published {self.count} messages")

诀窍在于,如果您在 process() 方法中调用 future.result(),您将阻塞直到该消息成功发布,因此请收集一个 futures 列表,然后在捆绑包确保它们都已发布或明确超时。对我们的内部管道之一进行的一些快速测试表明,这种方法可以在 ~200 秒内发布 160 万条消息。