计算 GroupBy 一次,然后将其传递给 Google DataFlow(Python SDK)中的多个转换
Computing GroupBy once then passing it to multiple transformations in Google DataFlow (Python SDK)
我正在使用 Apache Beam 的 Python SDK 在 Google DataFlow 上 运行 特征提取管道。我需要 运行 多个转换,所有转换都希望项目按键分组。
根据这个的回答,DataFlow无法像GroupBy一样自动发现和重用重复的转换,所以我希望先运行GroupBy,然后将结果PCollection提供给其他转换(参见下面的示例代码)。
我想知道这是否应该在 DataFlow 中有效地工作。如果不是,Python SDK 中推荐的解决方法是什么?有没有一种有效的方法可以让多个 Map 或 Write 转换获取同一个 GroupBy 的结果?就我而言,我观察到 DataFlow 以 5% 的利用率扩展到最大工作人员数量,并且按照 question.
中所述的 GroupBy 之后的步骤没有取得任何进展
示例代码。为简单起见,仅显示 2 个转换。
# Group by key once.
items_by_key = raw_items | GroupByKey()
# Write groupped items to a file.
(items_by_key | FlatMap(format_item) | WriteToText(path))
# Run another transformation over the same group.
features = (items_by_key | Map(extract_features))
将单个 GroupByKey
步骤的输出馈送到多个转换中应该可以正常工作。但是您可以获得的并行化量取决于原始 GroupByKey
步骤中可用的键总数。如果任何一个下游步骤是高扇出,请考虑在这些步骤之后添加一个 Reshuffle 步骤,这将允许 Dataflow 进一步并行执行。
例如,
pipeline | Create([<list of globs>]) | ParDo(ExpandGlobDoFn()) | Reshuffle() | ParDo(MyreadDoFn()) | Reshuffle() | ParDo(MyProcessDoFn())
这里,
ExpandGlobDoFn
:扩展输入 glob 并生成文件
MyReadDoFn
: 读取给定文件
MyProcessDoFn
: 处理从文件中读取的元素
我在这里使用了两个 Reshuffle
(注意 Reshuffle
中有一个 GroupByKey
)以允许 (1) 从给定的 glob 中并行读取文件 (2) 并行处理给定文件中的元素。
根据我在问题排查 this SO question 方面的经验,在多个转换中重复使用 GroupBy 输出会使您的管道极其缓慢。至少这是我使用 Apache Beam SDK 2.11.0 for Python.
的经验
常识告诉我,从执行图中的单个 GroupBy 分支出来应该会使我的管道 运行 更快。在 120 多名工人 运行ning 23 小时后,管道无法取得任何重大进展。我尝试添加重新洗牌,尽可能使用组合器并禁用实验性洗牌服务。
在我将管道分成两部分之前没有任何帮助。第一个管道计算 GroupBy 并将其存储在一个文件中(我需要将其 "as is" 提取到数据库中)。第二个读取带有 GroupBy 输出的文件,读取额外的输入和 运行 进一步的转换。结果 - 所有转换都在 2 小时内成功完成。我想如果我只是在原来的管道中复制 GroupBy,我可能会取得相同的结果。
我想知道这是 DataFlow 执行引擎或 Python SDK 中的错误,还是它按预期工作。如果是设计的,那么至少应该文档化,像这样的pipeline在提交的时候不要接受,不然应该有warning。
您可以通过查看 "Group keywords" 步骤中出现的 2 个分支来发现此问题。看起来解决方案是分别为每个分支重新运行 GroupBy。
我正在使用 Apache Beam 的 Python SDK 在 Google DataFlow 上 运行 特征提取管道。我需要 运行 多个转换,所有转换都希望项目按键分组。
根据这个
我想知道这是否应该在 DataFlow 中有效地工作。如果不是,Python SDK 中推荐的解决方法是什么?有没有一种有效的方法可以让多个 Map 或 Write 转换获取同一个 GroupBy 的结果?就我而言,我观察到 DataFlow 以 5% 的利用率扩展到最大工作人员数量,并且按照 question.
中所述的 GroupBy 之后的步骤没有取得任何进展示例代码。为简单起见,仅显示 2 个转换。
# Group by key once.
items_by_key = raw_items | GroupByKey()
# Write groupped items to a file.
(items_by_key | FlatMap(format_item) | WriteToText(path))
# Run another transformation over the same group.
features = (items_by_key | Map(extract_features))
将单个 GroupByKey
步骤的输出馈送到多个转换中应该可以正常工作。但是您可以获得的并行化量取决于原始 GroupByKey
步骤中可用的键总数。如果任何一个下游步骤是高扇出,请考虑在这些步骤之后添加一个 Reshuffle 步骤,这将允许 Dataflow 进一步并行执行。
例如,
pipeline | Create([<list of globs>]) | ParDo(ExpandGlobDoFn()) | Reshuffle() | ParDo(MyreadDoFn()) | Reshuffle() | ParDo(MyProcessDoFn())
这里,
ExpandGlobDoFn
:扩展输入 glob 并生成文件MyReadDoFn
: 读取给定文件MyProcessDoFn
: 处理从文件中读取的元素
我在这里使用了两个 Reshuffle
(注意 Reshuffle
中有一个 GroupByKey
)以允许 (1) 从给定的 glob 中并行读取文件 (2) 并行处理给定文件中的元素。
根据我在问题排查 this SO question 方面的经验,在多个转换中重复使用 GroupBy 输出会使您的管道极其缓慢。至少这是我使用 Apache Beam SDK 2.11.0 for Python.
的经验常识告诉我,从执行图中的单个 GroupBy 分支出来应该会使我的管道 运行 更快。在 120 多名工人 运行ning 23 小时后,管道无法取得任何重大进展。我尝试添加重新洗牌,尽可能使用组合器并禁用实验性洗牌服务。
在我将管道分成两部分之前没有任何帮助。第一个管道计算 GroupBy 并将其存储在一个文件中(我需要将其 "as is" 提取到数据库中)。第二个读取带有 GroupBy 输出的文件,读取额外的输入和 运行 进一步的转换。结果 - 所有转换都在 2 小时内成功完成。我想如果我只是在原来的管道中复制 GroupBy,我可能会取得相同的结果。
我想知道这是 DataFlow 执行引擎或 Python SDK 中的错误,还是它按预期工作。如果是设计的,那么至少应该文档化,像这样的pipeline在提交的时候不要接受,不然应该有warning。
您可以通过查看 "Group keywords" 步骤中出现的 2 个分支来发现此问题。看起来解决方案是分别为每个分支重新运行 GroupBy。