How to fix "AttributeError: 'str' object has no attribute 'items'" in a Dataflow pipeline reading from PubSub and writing to BigQuery

How to fix "AttributeError: 'str' object has no attribute 'items'" in a Dataflow pipeline reading from PubSub and writing to BigQuery

我正在尝试使用 Dataflow 创建流式传输管道,从 PubSub 主题读取消息并将它们写入 BigQuery table。我不想使用任何模板。目前我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道来执行这个简单的过程,而无需对来自 Pubsub 的数据进行任何转换(消息的结构是 table 所期望的)。

PubSub主题发布的消息如下:

data = '{"A":1, "B":"Hey", "C":"You"}'
message = data.encode('utf-8')

这是我用于管道的函数:

pipeline_options = PipelineOptions(pipeline_args = None, streaming = True, 
save_main_session = True)
parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))))

# table_schema = ["fields" :[{"type":"INTEGER", "name":"A", 
# "mode":"REQUIRED"},{"type":"STRING", "name":"B", "mode":"NULLABLE"}, 
# {"type":"STRING", "name":"C", "mode":"NULLABLE"}]]


with beam.Pipeline(options=pipeline_options) as p:

    # Read the pubsub topic and write the menssage into a bigquery table

    message = ( p | beam.io.ReadFromPubSub(topic="projects/real- 
                    demand/topics/Test_API", subscription=None)
                  | beam.io.WriteToBigQuery(table = '$Table', dataset = 
                    '$Dataset', project = '$Project', schema = 
                    table_schema)
               )

我有以下错误:

 AttributeError: 'str' object has no attribute 'items'

您传递的是 string 而不是 JSON。您需要将输入字符串解析为 json,如下所示

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['A']), (record['B']), (record['C'])

并且在你的管道中你必须做这样的事情

  lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (a_bq, b_bq, c_bq): {'A': a_bq, 'B': b_bq, 'C': c_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' A:STRING, B:STRING, C:STRING',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )