使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS
Write BigQuery results to GCS in CSV format using Apache Beam
我是 Apache Beam 的新手,我正在尝试编写一个管道以从 Google BigQuery 中提取数据并使用 Python 将数据以 CSV 格式写入 GCS。
使用 beam.io.read(beam.io.BigQuerySource())
我可以从 BigQuery 读取数据,但不确定如何以 CSV 格式将其写入 GCS。
是否有实现相同功能的自定义函数,您能帮帮我吗?
import logging
import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given Destination big query table.
BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`', use_standard_sql=True))
# Extract data from Bigquery to GCS in CSV format.
# This is where I need your help
BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
table='tablename',
dataset='datasetname',
project='project_id',
schema='name:string,gender:string,count:integer',
create_disposition=CREATE_IF_NEEDED,
write_disposition=WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
您可以使用 WriteToText
to add a .csv
suffix and headers
. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset 和以下查询:
SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10
我们现在读取查询结果:
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
BQ_DATA
现在包含 key-value 对:
{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
我们可以应用 beam.Map
函数来仅产生值:
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
BQ_VALUES
的摘录:
[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
最后再次映射,所有列值用逗号分隔而不是列表(考虑到如果双引号可以出现在字段中,您需要转义):
BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))
现在我们将结果写入GCS,后缀为headers:
BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')
书面成绩:
$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
对于使用 Python 3 寻找更新的任何人,请替换
行
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
和
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))
我是 Apache Beam 的新手,我正在尝试编写一个管道以从 Google BigQuery 中提取数据并使用 Python 将数据以 CSV 格式写入 GCS。
使用 beam.io.read(beam.io.BigQuerySource())
我可以从 BigQuery 读取数据,但不确定如何以 CSV 格式将其写入 GCS。
是否有实现相同功能的自定义函数,您能帮帮我吗?
import logging
import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given Destination big query table.
BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`', use_standard_sql=True))
# Extract data from Bigquery to GCS in CSV format.
# This is where I need your help
BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
table='tablename',
dataset='datasetname',
project='project_id',
schema='name:string,gender:string,count:integer',
create_disposition=CREATE_IF_NEEDED,
write_disposition=WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
您可以使用 WriteToText
to add a .csv
suffix and headers
. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset 和以下查询:
SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10
我们现在读取查询结果:
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
BQ_DATA
现在包含 key-value 对:
{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
我们可以应用 beam.Map
函数来仅产生值:
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
BQ_VALUES
的摘录:
[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
最后再次映射,所有列值用逗号分隔而不是列表(考虑到如果双引号可以出现在字段中,您需要转义):
BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))
现在我们将结果写入GCS,后缀为headers:
BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')
书面成绩:
$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
对于使用 Python 3 寻找更新的任何人,请替换
行BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
和
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))