Dataflow/apache beam - 传入模式时如何访问当前文件名?
Dataflow/apache beam - how to access current filename when passing in pattern?
我之前在堆栈溢出 () 上看到过这个问题的答案,但自从 apache beam 为 python 添加了 splittable dofn 功能后就没有了。将文件模式传递给 gcs 存储桶时,如何访问正在处理的当前文件的文件名?
我想将文件名传递到我的转换函数中:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
最终,我想要做的是在转换 json 的每一行时将文件名传递到我的转换函数中(参见 this 然后使用文件名在不同的 BQ table 来获得一个值)。我想一旦我设法知道如何获取文件名,我就能够找出侧面输入部分,以便在 bq table 中进行查找并获得唯一值。
我尝试使用之前引用的 . There, as well as in other approaches such as this one 实现一个解决方案,他们也获得了文件名列表,但将所有文件加载到单个元素中,这可能无法很好地适应大文件。因此,我考虑将文件名添加到每条记录中。
作为输入,我使用了两个 csv 文件:
$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain
gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france
使用 GCSFileSystem.match
我们可以访问 metadata_list
以检索包含文件路径和大小(以字节为单位)的 FileMetadata。在我的例子中:
[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]
密码是:
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
我们会将每个匹配的文件读取到不同的 PCollection 中。由于我们不知道先验文件的数量,我们需要以编程方式为每个 PCollection (p0, p1, ..., pN-1)
创建一个名称列表,并确保我们为每个步骤都有唯一的标签 ('Read file 0', 'Read file 1', etc.)
:
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
然后我们继续使用 ReadFromText
将每个不同的文件读入其相应的 PCollection,然后我们调用 AddFilenamesFn
ParDo 将每个记录与文件名相关联。
for i in range(len(result)):
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)
其中 AddFilenamesFn
是:
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}
我的第一种方法是直接使用 Map 函数,这样可以简化代码。然而,result[i].path
在循环结束时被解决,每条记录都被错误地映射到列表的最后一个文件:
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
最后,我们将所有 PCollection 展平为一个:
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()
我们通过记录元素来检查结果:
INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}
我用 DirectRunner
和 DataflowRunner
对 Python SDK 2.8.0 进行了测试。
我希望这能解决这里的主要问题,您现在可以继续将 BigQuery 集成到您的完整用例中。您可能需要为此使用 Python 客户端库,我写了一个类似的 Java example.
完整代码:
import argparse, logging
from operator import add
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}
# just logging output to visualize results
def write_res(element):
logging.info(element)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)
# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)
# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)
p.run()
if __name__ == '__main__':
run()
我必须读取一些元数据文件并使用文件名进行进一步处理。
当我终于遇到 apache_beam.io.ReadFromTextWithFilename
时,我很挣扎
def run(argv=None, save_main_session=True):
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromTextWithFilename
class ExtractMetaData(beam.DoFn):
def process(self, element):
filename, meta = element
image_name = filename.split("/")[-2]
labels = json.loads(meta)["labels"]
image = {"image_name": image_name, "labels": labels}
print(image)
return image
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
meta = (
pipeline
| "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/dev-set/**/*metadata.json')
| beam.ParDo(ExtractMetaData())
)
pipeline.run()
我之前在堆栈溢出 () 上看到过这个问题的答案,但自从 apache beam 为 python 添加了 splittable dofn 功能后就没有了。将文件模式传递给 gcs 存储桶时,如何访问正在处理的当前文件的文件名?
我想将文件名传递到我的转换函数中:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
最终,我想要做的是在转换 json 的每一行时将文件名传递到我的转换函数中(参见 this 然后使用文件名在不同的 BQ table 来获得一个值)。我想一旦我设法知道如何获取文件名,我就能够找出侧面输入部分,以便在 bq table 中进行查找并获得唯一值。
我尝试使用之前引用的
作为输入,我使用了两个 csv 文件:
$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain
gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france
使用 GCSFileSystem.match
我们可以访问 metadata_list
以检索包含文件路径和大小(以字节为单位)的 FileMetadata。在我的例子中:
[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]
密码是:
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
我们会将每个匹配的文件读取到不同的 PCollection 中。由于我们不知道先验文件的数量,我们需要以编程方式为每个 PCollection (p0, p1, ..., pN-1)
创建一个名称列表,并确保我们为每个步骤都有唯一的标签 ('Read file 0', 'Read file 1', etc.)
:
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
然后我们继续使用 ReadFromText
将每个不同的文件读入其相应的 PCollection,然后我们调用 AddFilenamesFn
ParDo 将每个记录与文件名相关联。
for i in range(len(result)):
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)
其中 AddFilenamesFn
是:
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}
我的第一种方法是直接使用 Map 函数,这样可以简化代码。然而,result[i].path
在循环结束时被解决,每条记录都被错误地映射到列表的最后一个文件:
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
最后,我们将所有 PCollection 展平为一个:
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()
我们通过记录元素来检查结果:
INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}
我用 DirectRunner
和 DataflowRunner
对 Python SDK 2.8.0 进行了测试。
我希望这能解决这里的主要问题,您现在可以继续将 BigQuery 集成到您的完整用例中。您可能需要为此使用 Python 客户端库,我写了一个类似的 Java example.
完整代码:
import argparse, logging
from operator import add
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs
class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}
# just logging output to visualize results
def write_res(element):
logging.info(element)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)
# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)
# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]
# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)
# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)
p.run()
if __name__ == '__main__':
run()
我必须读取一些元数据文件并使用文件名进行进一步处理。
当我终于遇到 apache_beam.io.ReadFromTextWithFilename
def run(argv=None, save_main_session=True):
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromTextWithFilename
class ExtractMetaData(beam.DoFn):
def process(self, element):
filename, meta = element
image_name = filename.split("/")[-2]
labels = json.loads(meta)["labels"]
image = {"image_name": image_name, "labels": labels}
print(image)
return image
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
meta = (
pipeline
| "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/dev-set/**/*metadata.json')
| beam.ParDo(ExtractMetaData())
)
pipeline.run()