(Spark skewed join) 如何在没有内存问题的情况下连接两个具有高度重复键的大型 Spark RDD?
(Spark skewed join) How to join two large Spark RDDs with highly duplicated keys without memory issues?
在 this previous question 中,我试图通过避免使用 join
来 避免 Spark join
的内存问题。
在这个新问题中,我正在使用 join
,但试图用它修复内存问题。
这是我的两个 RDD:
productToCustomerRDD:
大小:非常大,可能有数百万个不同的键
使用 HashPartitioner
对键进行分区
有些键会 高度 重复,有些则不会。
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)
productToCountRDD:
大小:非常大,可能有数百万个不同的键,太大 broadcast
使用 HashPartitioner
对键进行分区
键 唯一 ,值是购买该产品的客户数量。
(toast, 2)
(butter, 1)
(jelly, 1)
我想加入这两个RDD,结果会是:
customerToProductAndCountRDD:
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))
如果我用 productToCustomerRDD.join(productToCountRDD)
加入两个 RDD,我会在两个分区(数千个)上得到一个 OutOfMemoryError
。在 Spark UI 中,我注意到在包含 join
的阶段,在 Input Size / Records
列中,所有分区都有来自 4K 到 700K。除了产生 OOM 的两个分区之外的所有分区:一个有 9M 条记录,一个有 6M 条记录。
据我所知,为了加入,具有相同键的对需要被打乱并移动到相同的分区(除非它们之前是按键分区的)。但是,由于某些键非常频繁(例如:数据集中几乎每个客户都购买的产品),因此可能会在 join
期间或在 join
期间将大量数据移动到一个分区repartition
就在加入之前。
我理解正确吗?
有没有办法避免这种情况?
有没有一种方法可以 join
而不是在同一分区上拥有一个重度重复键的所有数据?
我的第一个问题是:你真的需要这些详细的数据吗?你真的需要知道 jhon 买了 2 个 toats 等等吗?我们处于大数据环境中,我们处理大量数据,因此有时聚合是减少基数并在分析和性能方面获得良好结果的好方法。所以如果你想知道一个产品被卖了多少次,你可以使用 pairRDD(product, count) [这样你就可以为每个产品设置一个元素] 或者如果你想知道用户的偏好,你可以使用 pairRDD(用户,购买的产品列表)[这样你就会有每个用户的元素]。如果您真的需要知道吐司是从 Jhon 购买的,为什么要将吐司密钥拆分为不同的重新分区?通过这种方式,您无法计算全局结果,因为在每个块中,您将只有一条关于您的密钥的信息。
实际上,这是 Spark 中的一个标准问题,称为 "skewed join":连接的一侧是倾斜的,这意味着它的一些键比其他键更频繁。可以找到一些不适合我的答案 。
我使用的策略受到here and its use in ConnectedComponents.skewedJoin()
here定义的GraphFrame.skewedJoin()
方法的启发。
连接将通过使用广播连接连接最频繁的键和使用标准连接连接不太频繁的键来执行。
在我的示例 (OP) 中,productToCountRDD
已包含有关键频率的信息。
所以它是这样的:
- 过滤
productToCountRDD
以仅保留高于固定阈值的计数,并 collectAsMap()
给驱动程序。
- 将此地图广播给所有执行者。
- 将
productToCustomerRDD
分成两个 RDD:在广播映射中找到的键(频繁键)和不在广播映射中的键(不常见键)。
- 使用
mapToPair
执行频繁键的连接,从广播映射 中获取count
- 使用
join
执行不常见键的连接。
- 在最后使用
union
以获得完整的 RDD。
在 this previous question 中,我试图通过避免使用 join
来 避免 Spark join
的内存问题。
在这个新问题中,我正在使用 join
,但试图用它修复内存问题。
这是我的两个 RDD:
productToCustomerRDD:
大小:非常大,可能有数百万个不同的键
使用HashPartitioner
对键进行分区 有些键会 高度 重复,有些则不会。(toast, John) (butter, John) (toast, Jane) (jelly, Jane)
productToCountRDD:
大小:非常大,可能有数百万个不同的键,太大broadcast
使用HashPartitioner
对键进行分区 键 唯一 ,值是购买该产品的客户数量。(toast, 2) (butter, 1) (jelly, 1)
我想加入这两个RDD,结果会是:
customerToProductAndCountRDD:
(toast, (John, 2)) (butter, (John, 1)) (toast, (Jane, 2)) (jelly, (Jane, 1))
如果我用 productToCustomerRDD.join(productToCountRDD)
加入两个 RDD,我会在两个分区(数千个)上得到一个 OutOfMemoryError
。在 Spark UI 中,我注意到在包含 join
的阶段,在 Input Size / Records
列中,所有分区都有来自 4K 到 700K。除了产生 OOM 的两个分区之外的所有分区:一个有 9M 条记录,一个有 6M 条记录。
据我所知,为了加入,具有相同键的对需要被打乱并移动到相同的分区(除非它们之前是按键分区的)。但是,由于某些键非常频繁(例如:数据集中几乎每个客户都购买的产品),因此可能会在 join
期间或在 join
期间将大量数据移动到一个分区repartition
就在加入之前。
我理解正确吗?
有没有办法避免这种情况?
有没有一种方法可以 join
而不是在同一分区上拥有一个重度重复键的所有数据?
我的第一个问题是:你真的需要这些详细的数据吗?你真的需要知道 jhon 买了 2 个 toats 等等吗?我们处于大数据环境中,我们处理大量数据,因此有时聚合是减少基数并在分析和性能方面获得良好结果的好方法。所以如果你想知道一个产品被卖了多少次,你可以使用 pairRDD(product, count) [这样你就可以为每个产品设置一个元素] 或者如果你想知道用户的偏好,你可以使用 pairRDD(用户,购买的产品列表)[这样你就会有每个用户的元素]。如果您真的需要知道吐司是从 Jhon 购买的,为什么要将吐司密钥拆分为不同的重新分区?通过这种方式,您无法计算全局结果,因为在每个块中,您将只有一条关于您的密钥的信息。
实际上,这是 Spark 中的一个标准问题,称为 "skewed join":连接的一侧是倾斜的,这意味着它的一些键比其他键更频繁。可以找到一些不适合我的答案
我使用的策略受到here and its use in ConnectedComponents.skewedJoin()
here定义的GraphFrame.skewedJoin()
方法的启发。
连接将通过使用广播连接连接最频繁的键和使用标准连接连接不太频繁的键来执行。
在我的示例 (OP) 中,productToCountRDD
已包含有关键频率的信息。
所以它是这样的:
- 过滤
productToCountRDD
以仅保留高于固定阈值的计数,并collectAsMap()
给驱动程序。 - 将此地图广播给所有执行者。
- 将
productToCustomerRDD
分成两个 RDD:在广播映射中找到的键(频繁键)和不在广播映射中的键(不常见键)。 - 使用
mapToPair
执行频繁键的连接,从广播映射 中获取 - 使用
join
执行不常见键的连接。 - 在最后使用
union
以获得完整的 RDD。
count