如何使用 Apache Beam 将 1 个文本文件的内容拆分为不同的 PCollections
How to split content of 1 text file into different PCollections using Apache Beam
我刚开始使用 Python 在 Apache Beam 上学习,并在这个问题上坚持了一段时间,希望能得到任何擅长 Apache Beam 的人的帮助。
这是我的问题陈述:
我有一个如下所示的文本文件:
BEGIN=burger
blue
lettuce
mayonise
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish
我可以知道如何使用 apache beam 将汉堡和鱼分成不同的 PCollections 以便我可以对这 2 个 PCollections 执行不同的操作吗?
这里附上我在 Python
中的代码片段
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText
class SplitRow(beam.DoFn):
def process(self,element):
return element.splitlines()
def ExtractBurger(element):
if element == "BEGIN=burger":
return element
p = beam.Pipeline()
squares = (
p
# | "Read From Text" >> ReadFromText("gs://abc.txt")
| "Create dummy text file" >> Create([
'BEGIN=burger',
'blue',
'lettuce',
'mayonise',
'END=burger',
'BEGIN=fish',
'green',
'strawberry',
'ketchup',
'END=fish',
])
| "Decode and split lines" >> ParDo(SplitRow())
| "Extract out Burger" >> Filter(ExtractBurger)
| Map(print)
)
p.run()
我的输出是这样的
BEGIN=burger
我能够提取出包含“BEGIN=burger”的行,但我真正想要的是将“BEGIN=burger”到“END=burger”之间的所有数据提取到 1 个 PCollection 和“BEGIN”中=fish" 到 "END=fish" 到另一个 PCollection,不确定是否可以这样做,因为我觉得 Apache Beam 只能进行行操作,我如何编写一个逻辑来做这样的事情
- 如果找到 BEGIN=burger
- 继续循环遍历下一行,直到找到 END=burger
- 取出整个部分并将其写入 PCollection
如果有人能提供一些见解,我们将不胜感激!谢谢!
Beam 并行处理元素。所以不能保证它会按原来的顺序逐行处理。
为此,您必须使用状态 (https://beam.apache.org/blog/stateful-processing/) 来记录当前处理是否在 BEGIN 和 END 之间。并且您必须确保 Beam 及其运行器(无论您选择哪个运行器)的并行度为 1,以便它不会并行处理元素。
但这违背了使用 Beam 的目的。
如果您无法更改文件:您可以编写一个 Python 脚本来执行此操作。
如果您可以更改生成文件的行为:您可以为“BEGIN”和“END”之间的每一行指定一个 uuid。而且您的文件甚至不需要包含原始顺序的行。
例如:
'burger=blue',
'burger=lettuce',
'burger=mayonise',
'fish=green',
'fish=strawberry',
'fish=ketchup',
'burger=pickle',
'fish=chips',
然后您可以并行处理所有行,将它们解析为 {key}={value}
,并按键分组到一个 PCollection 中,其中包含进一步转换的所有内容。
我刚开始使用 Python 在 Apache Beam 上学习,并在这个问题上坚持了一段时间,希望能得到任何擅长 Apache Beam 的人的帮助。
这是我的问题陈述:
我有一个如下所示的文本文件:
BEGIN=burger
blue
lettuce
mayonise
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish
我可以知道如何使用 apache beam 将汉堡和鱼分成不同的 PCollections 以便我可以对这 2 个 PCollections 执行不同的操作吗?
这里附上我在 Python
中的代码片段import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText
class SplitRow(beam.DoFn):
def process(self,element):
return element.splitlines()
def ExtractBurger(element):
if element == "BEGIN=burger":
return element
p = beam.Pipeline()
squares = (
p
# | "Read From Text" >> ReadFromText("gs://abc.txt")
| "Create dummy text file" >> Create([
'BEGIN=burger',
'blue',
'lettuce',
'mayonise',
'END=burger',
'BEGIN=fish',
'green',
'strawberry',
'ketchup',
'END=fish',
])
| "Decode and split lines" >> ParDo(SplitRow())
| "Extract out Burger" >> Filter(ExtractBurger)
| Map(print)
)
p.run()
我的输出是这样的
BEGIN=burger
我能够提取出包含“BEGIN=burger”的行,但我真正想要的是将“BEGIN=burger”到“END=burger”之间的所有数据提取到 1 个 PCollection 和“BEGIN”中=fish" 到 "END=fish" 到另一个 PCollection,不确定是否可以这样做,因为我觉得 Apache Beam 只能进行行操作,我如何编写一个逻辑来做这样的事情
- 如果找到 BEGIN=burger
- 继续循环遍历下一行,直到找到 END=burger
- 取出整个部分并将其写入 PCollection
如果有人能提供一些见解,我们将不胜感激!谢谢!
Beam 并行处理元素。所以不能保证它会按原来的顺序逐行处理。
为此,您必须使用状态 (https://beam.apache.org/blog/stateful-processing/) 来记录当前处理是否在 BEGIN 和 END 之间。并且您必须确保 Beam 及其运行器(无论您选择哪个运行器)的并行度为 1,以便它不会并行处理元素。 但这违背了使用 Beam 的目的。
如果您无法更改文件:您可以编写一个 Python 脚本来执行此操作。
如果您可以更改生成文件的行为:您可以为“BEGIN”和“END”之间的每一行指定一个 uuid。而且您的文件甚至不需要包含原始顺序的行。 例如:
'burger=blue', 'burger=lettuce', 'burger=mayonise', 'fish=green', 'fish=strawberry', 'fish=ketchup', 'burger=pickle', 'fish=chips',
然后您可以并行处理所有行,将它们解析为 {key}={value}
,并按键分组到一个 PCollection 中,其中包含进一步转换的所有内容。