为什么我需要洗牌我的 PCollection 才能在 Cloud Dataflow 上自动缩放?

Why do I need to shuffle my PCollection for it to autoscale on Cloud Dataflow?

上下文

我正在使用看起来像这样的过程从 Google 存储在 Beam 中读取文件:

data = pipeline | beam.Create(['gs://my/file.pkl']) | beam.ParDo(LoadFileDoFn)

其中 LoadFileDoFn 加载文件并从中创建一个 Python 对象列表,ParDo 然后 returns 作为 PCollection

我知道我可能可以实现自定义源来实现类似的东西,但是 and Beam's own documentation 表明这种通过 ParDo 读取伪数据集的方法并不少见,自定义源可能有点矫枉过正.

它也有效 - 我得到了一个 PCollection,其中包含正确数量的元素,我可以随意处理它!然而..

自动缩放问题

生成的 PCollection 根本不会在 Cloud Dataflow 上自动缩放。我首先必须通过以下方式对其进行转换:

shuffled_data = data | beam.Shuffle()

我知道 我在上面的链接中也解释了这个过程——但它没有给出任何关于为什么这是必要的见解。据我所知,在 Beam 的非常高的抽象级别上,我有一个 PCollection,在洗牌之前有 N 个元素,在洗牌之后有一个类似的 PCollection。为什么一个缩放,另一个不缩放?

文档在这种情况下不是很有帮助(或一般而言,但这是另一回事)。第一个 PCollection 有什么隐藏属性可以防止它被分发给另一个没有的多个工人?

当您通过 Create 阅读时,您正在创建一个绑定到 1 个工作人员的 PCollection。由于没有与项目关联的密钥,因此没有分发工作的机制。 Shuffle() 将在底层创建一个 K,V,然后进行洗牌,这使得 PCollection 项目能够在新工作人员启动时分发给他们。您通过关闭自动缩放并将工作人员大小固定为 25 来验证此行为 - 如果没有 Shuffle,您将只会看到 1 个工作人员在工作。

在 Creating/Reading 时分发此工作的另一种方法是构建您自己的自定义 I/O 以读取 PKL 文件 1。您将创建适当的分离器;但是,如果不知道你腌制了什么,它可能无法拆分。 IMO Shuffle() 是一个安全的选择,模数你可以通过编写一个可拆分的 reader.

来获得优化。