如何使用 PCollection 在 Beam 管道中构建数据存储查询?
How to use a PCollection to build datastore query in a beam pipeline?
我正在按照一些规则构建将 process/transform csv 文件转换为 xml 文件的光束管道。到目前为止,我的方法是在管道的开头按行拆分输入的 csv 文件,并将每一行输入管道。在管道中,将每一行转换为 xml 标记,在管道的末尾,我将所有内容全局合并到最终的 xml 文件中。现在的问题是我需要存储在 Google 数据存储中的一些额外信息来为 csv 文件中的每一行构建 xml 标记,我不知道该怎么做,因为查询到从数据存储中检索数据是一个运行时参数 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore),查询取决于 PCollection。我需要构建这样的查询:
Select from xxx where id in PCollection
有没有办法做到这一点?像 combineGlobally 这样的东西来构建查询,然后以某种方式将查询传递给 ReadFromDatastore 函数?或者有什么方法可以做我需要的吗?
我现在有这样的东西:
with beam.Pipeline(options=pipeline_options) as p:
items = (
p |
'ReadCsvFile' >> beam.io.Read(CsvFileSource(input_name))
'PrepareToJoin' >> beam.ParDo(PrepareToJoin())
)
datastore_items = (p |
'DatastoreDataP' >> ReadFromDatastore(project_id, query)
)
new_items = (
{'data': items, 'datastore': datastore_items} |
'JoinWithDatastore' >> beam.CoGroupByKey() |
'PostJoinProcess' >> beam.ParDo(PostJoinProcess())
)
xml_file = (new_items |
'ItemToXmlTag' >> beam.ParDo(ItemToXmlTag()) |
'MakeXMLFile' >> beam.CombineGlobally(XMLCombineFn()) |
WriteToText(output_name)
)
如您所见,查询是管道中的一个参数。
在此先感谢您的帮助!
您可以使用 ParDo 来实现此目的。下面是如何编写 ParDo 的伪代码,其中您处理 PCollection 的每个元素,然后进行 DataStore 查找
class EnrichEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
key = client.key('Task', 'sample_task')
task = client.get(element.key)
return [element]
else:
return [element]
然后在您的管道中,您可以使用如下所示的 ParDo
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create()
| 'DoFn methods' >> beam.ParDo(EnrichEntities())
| beam.Map(print)
要提高查找性能,您还可以使用 BagState
,您可以一次查找 N 条记录。这是一个 link 来实现这个 https://beam.apache.org/blog/2017/08/28/timely-processing.html
我正在按照一些规则构建将 process/transform csv 文件转换为 xml 文件的光束管道。到目前为止,我的方法是在管道的开头按行拆分输入的 csv 文件,并将每一行输入管道。在管道中,将每一行转换为 xml 标记,在管道的末尾,我将所有内容全局合并到最终的 xml 文件中。现在的问题是我需要存储在 Google 数据存储中的一些额外信息来为 csv 文件中的每一行构建 xml 标记,我不知道该怎么做,因为查询到从数据存储中检索数据是一个运行时参数 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore),查询取决于 PCollection。我需要构建这样的查询:
Select from xxx where id in PCollection
有没有办法做到这一点?像 combineGlobally 这样的东西来构建查询,然后以某种方式将查询传递给 ReadFromDatastore 函数?或者有什么方法可以做我需要的吗?
我现在有这样的东西:
with beam.Pipeline(options=pipeline_options) as p:
items = (
p |
'ReadCsvFile' >> beam.io.Read(CsvFileSource(input_name))
'PrepareToJoin' >> beam.ParDo(PrepareToJoin())
)
datastore_items = (p |
'DatastoreDataP' >> ReadFromDatastore(project_id, query)
)
new_items = (
{'data': items, 'datastore': datastore_items} |
'JoinWithDatastore' >> beam.CoGroupByKey() |
'PostJoinProcess' >> beam.ParDo(PostJoinProcess())
)
xml_file = (new_items |
'ItemToXmlTag' >> beam.ParDo(ItemToXmlTag()) |
'MakeXMLFile' >> beam.CombineGlobally(XMLCombineFn()) |
WriteToText(output_name)
)
如您所见,查询是管道中的一个参数。
在此先感谢您的帮助!
您可以使用 ParDo 来实现此目的。下面是如何编写 ParDo 的伪代码,其中您处理 PCollection 的每个元素,然后进行 DataStore 查找
class EnrichEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
key = client.key('Task', 'sample_task')
task = client.get(element.key)
return [element]
else:
return [element]
然后在您的管道中,您可以使用如下所示的 ParDo
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create()
| 'DoFn methods' >> beam.ParDo(EnrichEntities())
| beam.Map(print)
要提高查找性能,您还可以使用 BagState
,您可以一次查找 N 条记录。这是一个 link 来实现这个 https://beam.apache.org/blog/2017/08/28/timely-processing.html