如何在 Python 中为 Google 数据流管道设置编码器?
How to set coder for Google Dataflow Pipeline in Python?
我正在 Python 中创建一个自定义数据流作业,以将数据从 PubSub 提取到 BigQuery。 Table 有很多嵌套字段。
我可以在此管道中的何处设置编码器?
avail_schema = parse_table_schema_from_json(bg_out_schema)
coder = TableRowJsonCoder(table_schema=avail_schema)
with beam.Pipeline(options=options) as p:
# Read the text from PubSub messages.
lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name")
| 'Map' >> beam.Map(coder))
# transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
Error: Map can be used only with callable objects. Received TableRowJsonCoder instead.
在上面的代码中,编码器应用于从 PubSub 读取的文本消息。
WriteToBigQuery 适用于字典和 TableRow。 json.load 发出字典,因此您可以简单地使用它的输出写入 BigQuery,而无需应用任何编码器。请注意,字典中的字段必须匹配 Table Schema.
为避免编码器问题,我建议使用以下代码。
avail_schema = parse_table_schema_from_json(bg_out_schema)
with beam.Pipeline(options=options) as p:
# Read the text from PubSub messages.
lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name"))
transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
我正在 Python 中创建一个自定义数据流作业,以将数据从 PubSub 提取到 BigQuery。 Table 有很多嵌套字段。
我可以在此管道中的何处设置编码器?
avail_schema = parse_table_schema_from_json(bg_out_schema)
coder = TableRowJsonCoder(table_schema=avail_schema)
with beam.Pipeline(options=options) as p:
# Read the text from PubSub messages.
lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name")
| 'Map' >> beam.Map(coder))
# transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
Error: Map can be used only with callable objects. Received TableRowJsonCoder instead.
在上面的代码中,编码器应用于从 PubSub 读取的文本消息。
WriteToBigQuery 适用于字典和 TableRow。 json.load 发出字典,因此您可以简单地使用它的输出写入 BigQuery,而无需应用任何编码器。请注意,字典中的字段必须匹配 Table Schema.
为避免编码器问题,我建议使用以下代码。
avail_schema = parse_table_schema_from_json(bg_out_schema)
with beam.Pipeline(options=options) as p:
# Read the text from PubSub messages.
lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name"))
transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)