Apache Beam 将字典加载到 BigQuery
Apache Beam load dictionaries to BigQuery
我在使用 Apache Beam 将数据加载到 BiqQuery 时遇到问题。该代码正在进行 API 调用,它返回应该是字典的行(如下所示)。我的理解是,然后我应该考虑做 json.dumps()
和 json.loads()
以便制作可以迭代的东西以传递到 BQ。但是,每当我尝试这样做时,我都会遇到无法迭代 str 的问题,因为此时它不应该是一个字符串,这让我认为某些事情没有正确处理,但我不确定是什么有可能。我对 Python 没有那么多经验,所以我不确定从这里到哪里去。
{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"1234"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}
{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"5678"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}
流水线代码如下:
class callAPI(beam.DoFn):
def __init__(self, input_header):
self.headers = input_header
# self.remote_url = input_uri
def process(self, input_uri):
try:
res = requests.get(input_uri, headers=self.headers)
res.raise_for_status()
except HTTPError as message:
logging.error(message)
return
data = json.loads(json.dumps(res.text))
yield data
def run():
with beam.Pipeline() as p:
data = ( p
| beam.Create([REMOTE_URI])
| 'Call API ' >> beam.ParDo(callAPI(HEADER))
| 'Write to BQ ' >> beam.io.WriteToBigQuery(
table = table_name1,
schema = table_schema,
method="STREAMING_INSERTS",
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED ,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND )
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
错误:
AttributeError: 'str' object has no attribute 'items' [while running 'Write to BQ /_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
尝试按照以下代码片段拆分 JSON:-
class Transaction(beam.DoFn):
def process(self, element):
result = json.loads(element)
data_id = result['id']
data_source = result['source']
data_country = result['country']
data_region = result['region']
data_exampleKey = result['exampleKey']
data_name = result['name']
data_code = result['code']
data_currency = result['currency']
data_status = result['status']
return [{"id": data_id,"source": data_source,"country": data_country,"region": data_region, "exampleKey": data_exampleKey, "name": data_name, "code": data_code, "currency": data_currency, "status": data_status}]
我仔细查看了我进入的结构,因为我意识到我需要将数据放在列表中才能处理它。我添加了以下内容,这使我能够处理数据以加载到 BigQuery 中。
class convertJson(beam.DoFn):
def process (self, res):
data_list = []
for i in res.splitlines():
data_list.append(i)
data = []
for i in data_list:
data.append(json.loads(i))
yield data
我在使用 Apache Beam 将数据加载到 BiqQuery 时遇到问题。该代码正在进行 API 调用,它返回应该是字典的行(如下所示)。我的理解是,然后我应该考虑做 json.dumps()
和 json.loads()
以便制作可以迭代的东西以传递到 BQ。但是,每当我尝试这样做时,我都会遇到无法迭代 str 的问题,因为此时它不应该是一个字符串,这让我认为某些事情没有正确处理,但我不确定是什么有可能。我对 Python 没有那么多经验,所以我不确定从这里到哪里去。
{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"1234"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}
{"id":"1234","source":"example","country":"Example Country","region":"Example Region","exampleKey":"example","name":"Test","code":"null","currency":"EUR","status":1},"Detail":{"id":"1234","name":"example","code":"example","currency":"EUR"},"dateDetail":{"date":"2021-04-24","itemId":"5678"},"cost":[{"Type":"1","TypeName":"example","price":0.0}]}
流水线代码如下:
class callAPI(beam.DoFn):
def __init__(self, input_header):
self.headers = input_header
# self.remote_url = input_uri
def process(self, input_uri):
try:
res = requests.get(input_uri, headers=self.headers)
res.raise_for_status()
except HTTPError as message:
logging.error(message)
return
data = json.loads(json.dumps(res.text))
yield data
def run():
with beam.Pipeline() as p:
data = ( p
| beam.Create([REMOTE_URI])
| 'Call API ' >> beam.ParDo(callAPI(HEADER))
| 'Write to BQ ' >> beam.io.WriteToBigQuery(
table = table_name1,
schema = table_schema,
method="STREAMING_INSERTS",
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED ,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND )
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
错误:
AttributeError: 'str' object has no attribute 'items' [while running 'Write to BQ /_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
尝试按照以下代码片段拆分 JSON:-
class Transaction(beam.DoFn):
def process(self, element):
result = json.loads(element)
data_id = result['id']
data_source = result['source']
data_country = result['country']
data_region = result['region']
data_exampleKey = result['exampleKey']
data_name = result['name']
data_code = result['code']
data_currency = result['currency']
data_status = result['status']
return [{"id": data_id,"source": data_source,"country": data_country,"region": data_region, "exampleKey": data_exampleKey, "name": data_name, "code": data_code, "currency": data_currency, "status": data_status}]
我仔细查看了我进入的结构,因为我意识到我需要将数据放在列表中才能处理它。我添加了以下内容,这使我能够处理数据以加载到 BigQuery 中。
class convertJson(beam.DoFn):
def process (self, res):
data_list = []
for i in res.splitlines():
data_list.append(i)
data = []
for i in data_list:
data.append(json.loads(i))
yield data