数据流 - BigQuery 自动检测?
Dataflow - BigQuery autodetect?
我在 Dataflow 中遇到问题。我使用了 Python BigQuery API,它与自动检测配合使用效果很好。它 运行 很好,job_config 创建 table 并同时附加值:
...
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.create_disposition = 'CREATE_IF_NEEDED',
job_config.source_format = 'CSV',
job_config.field_delimiter = '|',
job_config.skip_leading_rows = 1
load_job = client.load_table_from_uri(
uri,
table_id,
location=region,
job_config=job_config,)
所以,我需要使用数据流来扩展从 gcs 到 BigQuery 的传输,但是,我没有找到关于如何在 writetobigquery apache beam 中使用自动检测的资源。
class DataIngestion:
def parse_method(self, string_input):
values = re.split(",",
re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('date', 'volume', 'open', 'close', 'high', 'low', 'adjclose'),
values))
return row
...
(p
| 'Read from a File' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
#| 'Parse file' >> beam.Map(parse_file)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output,
custom_gcs_temp_location="gs://XXXX",
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_APPEND"))
p.run().wait_until_finish()
所以,我也有 parse_method,但我试图 运行 return 只有没有字典的值...
尝试将 schema='SCHEMA_AUTODETECT'
传递给 PTransform。那应该启用它。
我在 Dataflow 中遇到问题。我使用了 Python BigQuery API,它与自动检测配合使用效果很好。它 运行 很好,job_config 创建 table 并同时附加值:
...
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.create_disposition = 'CREATE_IF_NEEDED',
job_config.source_format = 'CSV',
job_config.field_delimiter = '|',
job_config.skip_leading_rows = 1
load_job = client.load_table_from_uri(
uri,
table_id,
location=region,
job_config=job_config,)
所以,我需要使用数据流来扩展从 gcs 到 BigQuery 的传输,但是,我没有找到关于如何在 writetobigquery apache beam 中使用自动检测的资源。
class DataIngestion:
def parse_method(self, string_input):
values = re.split(",",
re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('date', 'volume', 'open', 'close', 'high', 'low', 'adjclose'),
values))
return row
...
(p
| 'Read from a File' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
#| 'Parse file' >> beam.Map(parse_file)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output,
custom_gcs_temp_location="gs://XXXX",
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_APPEND"))
p.run().wait_until_finish()
所以,我也有 parse_method,但我试图 运行 return 只有没有字典的值...
尝试将 schema='SCHEMA_AUTODETECT'
传递给 PTransform。那应该启用它。