Apache Beam 将字典加载到 BigQuery

Apache Beam load dictionaries to BigQuery

我在使用 Apache Beam 将数据加载到 BiqQuery 时遇到问题。该代码正在进行 API 调用,它返回应该是字典的行(如下所示)。我的理解是,然后我应该考虑做 json.dumps()json.loads() 以便制作可以迭代的东西以传递到 BQ。但是,每当我尝试这样做时,我都会遇到无法迭代 str 的问题,因为此时它不应该是一个字符串,这让我认为某些事情没有正确处理,但我不确定是什么有可能。我对 Python 没有那么多经验,所以我不确定从这里到哪里去。

{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"1234"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}
{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"5678"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}

流水线代码如下:

           class callAPI(beam.DoFn):
                def __init__(self, input_header):
                    self.headers = input_header
                   # self.remote_url = input_uri
            
                def process(self, input_uri):
                    try:
                        res = requests.get(input_uri, headers=self.headers) 
                        res.raise_for_status()
                    except HTTPError as message:
                        logging.error(message)
                        return

                    
                    data = json.loads(json.dumps(res.text))

                    yield data 
      
    def run():
        with beam.Pipeline() as p:
           data = ( p 
                            | beam.Create([REMOTE_URI])
                            | 'Call API ' >> beam.ParDo(callAPI(HEADER)) 
                            | 'Write to BQ ' >> beam.io.WriteToBigQuery(
                              table = table_name1,
                              schema = table_schema,
                              method="STREAMING_INSERTS",
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED  ,
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND )
                            )
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

错误:

   AttributeError: 'str' object has no attribute 'items' [while running 'Write to BQ /_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']

尝试按照以下代码片段拆分 JSON:-

class Transaction(beam.DoFn):
    def process(self, element):
        result = json.loads(element)
        
        data_id =  result['id']
        data_source = result['source']
        data_country =  result['country']
        data_region = result['region']
        data_exampleKey =  result['exampleKey']
        data_name = result['name']
        data_code =  result['code']
        data_currency = result['currency']
        data_status = result['status']
        
        return [{"id": data_id,"source": data_source,"country": data_country,"region": data_region, "exampleKey": data_exampleKey, "name": data_name, "code": data_code, "currency": data_currency, "status": data_status}]

我仔细查看了我进入的结构,因为我意识到我需要将数据放在列表中才能处理它。我添加了以下内容,这使我能够处理数据以加载到 BigQuery 中。

class convertJson(beam.DoFn):

    def process (self, res):

        data_list = []
        for i in res.splitlines(): 
          data_list.append(i)
       

        data = []
        for i in data_list:
          data.append(json.loads(i))


        yield data