如何使用 Apache Beam 将 1 个文本文件的内容拆分为不同的 PCollections

How to split content of 1 text file into different PCollections using Apache Beam

我刚开始使用 Python 在 Apache Beam 上学习,并在这个问题上坚持了一段时间,希望能得到任何擅长 Apache Beam 的人的帮助。

这是我的问题陈述:

我有一个如下所示的文本文件:

BEGIN=burger
blue
lettuce
mayonise 
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish

我可以知道如何使用 apache beam 将汉堡和鱼分成不同的 PCollections 以便我可以对这 2 个 PCollections 执行不同的操作吗?

这里附上我在 Python

中的代码片段
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText

class SplitRow(beam.DoFn):
  def process(self,element):
    return element.splitlines()


def ExtractBurger(element):
    if element == "BEGIN=burger":
        return element

p = beam.Pipeline()
squares = (
    p 
#     | "Read From Text" >> ReadFromText("gs://abc.txt")
    | "Create dummy text file" >> Create([
          'BEGIN=burger',
          'blue',
          'lettuce',
          'mayonise',
          'END=burger',
          'BEGIN=fish',
          'green',
          'strawberry',
          'ketchup',
          'END=fish',
      ])
    | "Decode and split lines" >> ParDo(SplitRow())
    | "Extract out Burger" >> Filter(ExtractBurger)
    | Map(print)
)
p.run()

我的输出是这样的

BEGIN=burger

我能够提取出包含“BEGIN=burger”的行,但我真正想要的是将“BEGIN=burger”到“END=burger”之间的所有数据提取到 1 个 PCollection 和“BEGIN”中=fish" 到 "END=fish" 到另一个 PCollection,不确定是否可以这样做,因为我觉得 Apache Beam 只能进行行操作,我如何编写一个逻辑来做这样的事情

  1. 如果找到 BEGIN=burger
  2. 继续循环遍历下一行,直到找到 END=burger
  3. 取出整个部分并将其写入 PCollection

如果有人能提供一些见解,我们将不胜感激!谢谢!

Beam 并行处理元素。所以不能保证它会按原来的顺序逐行处理。

为此,您必须使用状态 (https://beam.apache.org/blog/stateful-processing/) 来记录当前处理是否在 BEGIN 和 END 之间。并且您必须确保 Beam 及其运行器(无论您选择哪个运行器)的并行度为 1,以便它不会并行处理元素。 但这违背了使用 Beam 的目的。

  • 如果您无法更改文件:您可以编写一个 Python 脚本来执行此操作。

  • 如果您可以更改生成文件的行为:您可以为“BEGIN”和“END”之间的每一行指定一个 uuid。而且您的文件甚至不需要包含原始顺序的行。 例如:

       'burger=blue',
       'burger=lettuce',
       'burger=mayonise',
       'fish=green',
       'fish=strawberry',
       'fish=ketchup',
       'burger=pickle',
       'fish=chips',
    

然后您可以并行处理所有行,将它们解析为 {key}={value},并按键分组到一个 PCollection 中,其中包含进一步转换的所有内容。