从 Bigquery 读取几行作为辅助输入,得到 None
Reading a few rows from Bigquery as a side input, getting None
我在数据流管道的侧输入(特别是 BQ)方面遇到问题,即使在参加 coursera 并查看示例之后也是如此。
现在,我有一个管道读取 gcs 存储桶中的文件,获取它们的文件名,然后转换文件并将给定数量的行写入 bigquery。我想弄清楚如何将文件名映射到来自 bq.
的特定 "key"
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
#create each input PCollection name
variables = ['p{}'.format(i) for i in range(len(result))]
根据结果,我构建了一个包含所有文件名的元组(文件名 1、文件名 2…),动态创建了一个查询,即:
Bqquery = "SELECT FILENAME, FILE_ID from 'project:dataset.table' where FILENAME IN (filename tuple)"
我想我会这样做,因为它一次大约有 20 个文件,所以从 bq 获取数据一次而不是必须在 for 循环中获取 file_id 是有意义的。
所以我做了
Bqcollection = p | 'Get File_Id' >> beam.io.Read(beam.io.BigQuerySource(query=bqquery))
But the result I get is none
for i in range(len(result)):
current_file = result[i].path
#query inside for loop
#bqquery= "SELECT FILE_ID" from 'project:dataset.table' where FILENAME = '{0}'".format(current_file)
# file_id = p | 'GetFile_id_{0}'.format(i) >> beam.io.Read(beam.io.BigQuerySource(query=bqquery))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), current_file)
我还尝试在 for 循环内进行查询,以便一次只获取一个文件名(请参阅注释掉的代码),但这也不起作用。最终我想要做的是将 beam.Pardo(AddFilenamesFn(), current_file)
更改为 beam.Pardo(AddFileNamesFn(), file_id)
所以我没有添加实际的文件名而是添加了 fileid
[注意代码中提到的标签(即 read_labels[i])只是数据流的标签]
我想我遗漏了一些关于 pcollections 的非常基本的东西,但不确定
考虑到之前 中的代码,我认为最简单的解决方案是 运行 for 循环中 AddFilenamesFn
ParDo 内的查询。请记住,beam.io.Read(beam.io.BigQuerySource(query=bqquery))
用于将行读取为源而不是中间步骤。因此,在我建议的情况下,您可以直接使用 Python Client Library (google-cloud-bigquery>0.27.0
):
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with file id (retrieved from BigQuery) and row"""
def process(self, element, file_path):
from google.cloud import bigquery
client = bigquery.Client()
file_name = file_path.split("/")[-1]
query_job = client.query("""
SELECT FILE_ID
FROM test.file_mapping
WHERE FILENAME = '{0}'
LIMIT 1""".format(file_name))
results = query_job.result()
for row in results:
file_id = row.FILE_ID
yield {'filename':file_id, 'row':element}
这将是最直接的实施解决方案,但它可能会出现问题。我们 运行 为每个 line/record 查询一个查询,而不是在管道开始时 运行 查询所有 ~20 个可能的查询。例如,如果我们在单个文件中有 3,000 个元素,则相同的查询将启动 3,000 次。但是,每个不同的查询实际上应该 运行 一次,后续查询 "repeats" 将命中 cache. Also note that cached queries do not contribute towards the interactive query limit.
我使用了与之前相同的文件 :
$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain
gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france
并添加了一个新的 table:
bq mk test.file_mapping FILENAME:STRING,FILE_ID:STRING
bq query --use_legacy_sql=false 'INSERT INTO test.file_mapping (FILENAME, FILE_ID) values ("countries1.csv", "COUNTRIES ONE"), ("countries2.csv", "COUNTRIES TWO")'
输出为:
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'1,sweden'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'2,spain'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'3,italy'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'4,france'}
另一种解决方案是使用 beam.io.BigQuerySource()
加载所有 table 并将其具体化为辅助输入(当然这可能会出现问题,具体取决于大小),或者如您所说,破坏它分解为 N 个查询,并将每个查询保存到不同的侧输入中。然后,您可以 select 为每条记录选择合适的记录,并将其作为附加输入传递给 AddFilenamesFn
。尝试写那个也会很有趣。
我提出的第一个解决方案的完整代码:
import argparse, logging
from operator import add
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with file id (retrieved from BigQuery) and row"""
def process(self, element, file_path):
from google.cloud import bigquery
client = bigquery.Client()
file_name = file_path.split("/")[-1]
query_job = client.query("""
SELECT FILE_ID
FROM test.file_mapping
WHERE FILENAME = '{0}'
LIMIT 1""".format(file_name))
results = query_job.result()
for row in results:
file_id = row.FILE_ID
yield {'filename':file_id, 'row':element}
# just logging output to visualize results
def write_res(element):
logging.info(element)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)
# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)
# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)
p.run()
if __name__ == '__main__':
run()
我在数据流管道的侧输入(特别是 BQ)方面遇到问题,即使在参加 coursera 并查看示例之后也是如此。
现在,我有一个管道读取 gcs 存储桶中的文件,获取它们的文件名,然后转换文件并将给定数量的行写入 bigquery。我想弄清楚如何将文件名映射到来自 bq.
的特定 "key"result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
#create each input PCollection name
variables = ['p{}'.format(i) for i in range(len(result))]
根据结果,我构建了一个包含所有文件名的元组(文件名 1、文件名 2…),动态创建了一个查询,即:
Bqquery = "SELECT FILENAME, FILE_ID from 'project:dataset.table' where FILENAME IN (filename tuple)"
我想我会这样做,因为它一次大约有 20 个文件,所以从 bq 获取数据一次而不是必须在 for 循环中获取 file_id 是有意义的。
所以我做了
Bqcollection = p | 'Get File_Id' >> beam.io.Read(beam.io.BigQuerySource(query=bqquery))
But the result I get is none
for i in range(len(result)):
current_file = result[i].path
#query inside for loop
#bqquery= "SELECT FILE_ID" from 'project:dataset.table' where FILENAME = '{0}'".format(current_file)
# file_id = p | 'GetFile_id_{0}'.format(i) >> beam.io.Read(beam.io.BigQuerySource(query=bqquery))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), current_file)
我还尝试在 for 循环内进行查询,以便一次只获取一个文件名(请参阅注释掉的代码),但这也不起作用。最终我想要做的是将 beam.Pardo(AddFilenamesFn(), current_file)
更改为 beam.Pardo(AddFileNamesFn(), file_id)
所以我没有添加实际的文件名而是添加了 fileid
[注意代码中提到的标签(即 read_labels[i])只是数据流的标签]
我想我遗漏了一些关于 pcollections 的非常基本的东西,但不确定
考虑到之前 AddFilenamesFn
ParDo 内的查询。请记住,beam.io.Read(beam.io.BigQuerySource(query=bqquery))
用于将行读取为源而不是中间步骤。因此,在我建议的情况下,您可以直接使用 Python Client Library (google-cloud-bigquery>0.27.0
):
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with file id (retrieved from BigQuery) and row"""
def process(self, element, file_path):
from google.cloud import bigquery
client = bigquery.Client()
file_name = file_path.split("/")[-1]
query_job = client.query("""
SELECT FILE_ID
FROM test.file_mapping
WHERE FILENAME = '{0}'
LIMIT 1""".format(file_name))
results = query_job.result()
for row in results:
file_id = row.FILE_ID
yield {'filename':file_id, 'row':element}
这将是最直接的实施解决方案,但它可能会出现问题。我们 运行 为每个 line/record 查询一个查询,而不是在管道开始时 运行 查询所有 ~20 个可能的查询。例如,如果我们在单个文件中有 3,000 个元素,则相同的查询将启动 3,000 次。但是,每个不同的查询实际上应该 运行 一次,后续查询 "repeats" 将命中 cache. Also note that cached queries do not contribute towards the interactive query limit.
我使用了与之前相同的文件
$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain
gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france
并添加了一个新的 table:
bq mk test.file_mapping FILENAME:STRING,FILE_ID:STRING
bq query --use_legacy_sql=false 'INSERT INTO test.file_mapping (FILENAME, FILE_ID) values ("countries1.csv", "COUNTRIES ONE"), ("countries2.csv", "COUNTRIES TWO")'
输出为:
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'1,sweden'}
INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'2,spain'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'id,country'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'3,italy'}
INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'4,france'}
另一种解决方案是使用 beam.io.BigQuerySource()
加载所有 table 并将其具体化为辅助输入(当然这可能会出现问题,具体取决于大小),或者如您所说,破坏它分解为 N 个查询,并将每个查询保存到不同的侧输入中。然后,您可以 select 为每条记录选择合适的记录,并将其作为附加输入传递给 AddFilenamesFn
。尝试写那个也会很有趣。
我提出的第一个解决方案的完整代码:
import argparse, logging
from operator import add
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with file id (retrieved from BigQuery) and row"""
def process(self, element, file_path):
from google.cloud import bigquery
client = bigquery.Client()
file_name = file_path.split("/")[-1]
query_job = client.query("""
SELECT FILE_ID
FROM test.file_mapping
WHERE FILENAME = '{0}'
LIMIT 1""".format(file_name))
results = query_job.result()
for row in results:
file_id = row.FILE_ID
yield {'filename':file_id, 'row':element}
# just logging output to visualize results
def write_res(element):
logging.info(element)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)
# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)
# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)
p.run()
if __name__ == '__main__':
run()