Google 数据流作业在 writeToBiqquery 步骤上失败:'list' 对象和 'str' 对象没有属性“items”
Google dataflow job failing on writeToBiqquery step : 'list' object and 'str' object has no attribute'items'
我有一个光束管道 运行ning 使用数据流 运行ner。它接受 XML 并输出一个 JSON,然后必须将其存储在 Bigquery table 中。早些时候,我使用 beam 管道将换行符分隔 JSON 写入 GCS 存储桶,并从文件中创建 BQ table 而不对其进行任何更改(使用 bigquery 控制台)。作业 运行 成功,数据顺利导入 BQ。
现在我修改了管道,以便直接将输出 JSON 行写入 BQ table。我正在使用 apache beams beam.io.WriteToBigQuery 函数。 Pcollections 是 json 个对象,其中每一行包含 BQ 的一个对象(行)。
下面是进入 WriteToBigQuery 的示例输入:
{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}
我的部分代码如下:
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project_name'
google_cloud_options.job_name = 'jobid'
google_cloud_options.staging_location = 'gs://bucket/staging'
google_cloud_options.temp_location = 'gs://bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
table_spec = 'project:dataset.table'
data = (p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
#| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
#| beam.Map(print_row)
)
project_id = "project1"
dataset_id = 'dataset'
table_id = 'table'
table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')
data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
p.run()
我运行这个pipeline时报错如下:
AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']
我认为错误是由于上一步中的 return 类型造成的,或者是与执行 straming 和批量加载到 BigQuery 相关的问题。我想在 mycase 中进行批量加载。
我尝试使用 Apache BEam documentation-Writing to a bigquery table 中给出的示例插入管道
该管道有效。那里的数据形式如下:
quotes = p | beam.Create([
{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])
如何修改我的管道,以便将我的案例中的字符串类型数据写入 bigquery table。
如果有人遇到同样的问题,请将其张贴在这里。这是一个非常微小的细节,我忽略了。
beam.io.WriteToBigquery() 将字典作为输入。我在接收器部分之前的 pcollection 返回单个元素或字符串的列表(取决于我尝试的某些版本)。
我刚刚在管道中添加了另一个步骤,使用 json.loads 将 json 字符串转换为 python 字典,行成功加载到 BQ。
我有一个光束管道 运行ning 使用数据流 运行ner。它接受 XML 并输出一个 JSON,然后必须将其存储在 Bigquery table 中。早些时候,我使用 beam 管道将换行符分隔 JSON 写入 GCS 存储桶,并从文件中创建 BQ table 而不对其进行任何更改(使用 bigquery 控制台)。作业 运行 成功,数据顺利导入 BQ。
现在我修改了管道,以便直接将输出 JSON 行写入 BQ table。我正在使用 apache beams beam.io.WriteToBigQuery 函数。 Pcollections 是 json 个对象,其中每一行包含 BQ 的一个对象(行)。
下面是进入 WriteToBigQuery 的示例输入:
{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}
我的部分代码如下:
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project_name'
google_cloud_options.job_name = 'jobid'
google_cloud_options.staging_location = 'gs://bucket/staging'
google_cloud_options.temp_location = 'gs://bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
table_spec = 'project:dataset.table'
data = (p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
#| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
#| beam.Map(print_row)
)
project_id = "project1"
dataset_id = 'dataset'
table_id = 'table'
table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')
data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
p.run()
我运行这个pipeline时报错如下:
AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']
我认为错误是由于上一步中的 return 类型造成的,或者是与执行 straming 和批量加载到 BigQuery 相关的问题。我想在 mycase 中进行批量加载。 我尝试使用 Apache BEam documentation-Writing to a bigquery table 中给出的示例插入管道 该管道有效。那里的数据形式如下:
quotes = p | beam.Create([
{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])
如何修改我的管道,以便将我的案例中的字符串类型数据写入 bigquery table。
如果有人遇到同样的问题,请将其张贴在这里。这是一个非常微小的细节,我忽略了。 beam.io.WriteToBigquery() 将字典作为输入。我在接收器部分之前的 pcollection 返回单个元素或字符串的列表(取决于我尝试的某些版本)。 我刚刚在管道中添加了另一个步骤,使用 json.loads 将 json 字符串转换为 python 字典,行成功加载到 BQ。