如何使用 PCollection 在 Beam 管道中构建数据存储查询?

How to use a PCollection to build datastore query in a beam pipeline?

我正在按照一些规则构建将 process/transform csv 文件转换为 xml 文件的光束管道。到目前为止,我的方法是在管道的开头按行拆分输入的 csv 文件,并将每一行输入管道。在管道中,将每一行转换为 xml 标记,在管道的末尾,我将所有内容全局合并到最终的 xml 文件中。现在的问题是我需要存储在 Google 数据存储中的一些额外信息来为 csv 文件中的每一行构建 xml 标记,我不知道该怎么做,因为查询到从数据存储中检索数据是一个运行时参数 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore),查询取决于 PCollection。我需要构建这样的查询:

Select from xxx where id in PCollection

有没有办法做到这一点?像 combineGlobally 这样的东西来构建查询,然后以某种方式将查询传递给 ReadFromDatastore 函数?或者有什么方法可以做我需要的吗?

我现在有这样的东西:

with beam.Pipeline(options=pipeline_options) as p:
    items = (
        p |
        'ReadCsvFile' >> beam.io.Read(CsvFileSource(input_name))
        'PrepareToJoin' >> beam.ParDo(PrepareToJoin())
    )

    datastore_items = (p |
        'DatastoreDataP' >> ReadFromDatastore(project_id, query)
    )

    new_items = (
        {'data': items, 'datastore': datastore_items} |
        'JoinWithDatastore' >> beam.CoGroupByKey() |
        'PostJoinProcess' >> beam.ParDo(PostJoinProcess())
    )

    xml_file = (new_items |
        'ItemToXmlTag' >> beam.ParDo(ItemToXmlTag()) |
        'MakeXMLFile' >> beam.CombineGlobally(XMLCombineFn()) |
        WriteToText(output_name)
    )

如您所见,查询是管道中的一个参数。

在此先感谢您的帮助!

您可以使用 ParDo 来实现此目的。下面是如何编写 ParDo 的伪代码,其中您处理 PCollection 的每个元素,然后进行 DataStore 查找


class EnrichEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
    key = client.key('Task', 'sample_task')
    task = client.get(element.key)

        return [element]
    else:
        return [element]

然后在您的管道中,您可以使用如下所示的 ParDo

with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Create inputs' >> beam.Create()
      | 'DoFn methods' >> beam.ParDo(EnrichEntities())
      | beam.Map(print)

要提高查找性能,您还可以使用 BagState,您可以一次查找 N 条记录。这是一个 link 来实现这个 https://beam.apache.org/blog/2017/08/28/timely-processing.html