如何在 Python 中为 Google 数据流管道设置编码器?

How to set coder for Google Dataflow Pipeline in Python?

我正在 Python 中创建一个自定义数据流作业,以将数据从 PubSub 提取到 BigQuery。 Table 有很多嵌套字段。

我可以在此管道中的何处设置编码器?

avail_schema = parse_table_schema_from_json(bg_out_schema)
coder = TableRowJsonCoder(table_schema=avail_schema)

with beam.Pipeline(options=options) as p:
    # Read the text from PubSub messages.
    lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name")
              | 'Map' >> beam.Map(coder))
    # transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
    transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)


Error: Map can be used only with callable objects. Received TableRowJsonCoder instead.

在上面的代码中,编码器应用于从 PubSub 读取的文本消息。

WriteToBigQuery 适用于字典和 TableRow。 json.load 发出字典,因此您可以简单地使用它的输出写入 BigQuery,而无需应用任何编码器。请注意,字典中的字段必须匹配 Table Schema.

为避免编码器问题,我建议使用以下代码。

avail_schema = parse_table_schema_from_json(bg_out_schema)

with beam.Pipeline(options=options) as p:
    # Read the text from PubSub messages.
    lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name"))
    transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
    transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)