spark 的 distinct() 函数是否仅混洗每个分区中的不同元组
Does spark's distinct() function shuffle only the distinct tuples from each partition
据我了解,distinct() 散列对 RDD 进行分区以标识唯一键。但是它是否优化了每个分区只移动不同的元组?
想象一个具有以下分区的 RDD
- [1, 2, 2, 1, 4, 2, 2]
- [1, 3, 3, 5, 4, 5, 5, 5]
在这个 RDD 上的不同键上,所有重复的键(分区 1 中的 2 和分区 2 中的 5)是否会被洗牌到它们的目标分区,或者每个分区只有不同的键被洗牌到目标?
如果所有键都被打乱,那么带有 set() 操作的 aggregate() 将减少打乱。
def set_update(u, v):
u.add(v)
return u
rdd.aggregate(set(), set_update, lambda u1,u2: u1|u2)
unique
通过 reduceByKey
在 (element, None)
对上实现。所以它只对每个分区的唯一值进行洗牌。如果重复次数很少,它仍然是相当昂贵的操作。
有些情况下使用 set
会很有用。特别是如果您在 PairwseRDD
上调用 distinct
,您可能更喜欢 aggregateByKey
/ combineByKey
而不是同时实现重复数据删除和按键分区。特别考虑以下代码:
rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)])
rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")])
rdd1.distinct().join(rdd2)
它必须洗牌 rdd1
两次 - 一次 distinct
一次 join
一次。相反,您可以使用 combineByKey
:
def flatten(kvs):
(key, (left, right)) = kvs
for v in left:
yield (key, (v, right))
aggregated = (rdd1
.aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2))
rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions())
(aggregated.join(rdd2_partitioned)
.flatMap(flatten))
注意:
join
逻辑在 Scala 中与在 Python 中略有不同(PySpark 使用 union
后跟 groupByKey
,参见 Python 和 Scala DAG),因此我们必须在调用 join.
之前手动划分第二个 RDD
据我了解,distinct() 散列对 RDD 进行分区以标识唯一键。但是它是否优化了每个分区只移动不同的元组?
想象一个具有以下分区的 RDD
- [1, 2, 2, 1, 4, 2, 2]
- [1, 3, 3, 5, 4, 5, 5, 5]
在这个 RDD 上的不同键上,所有重复的键(分区 1 中的 2 和分区 2 中的 5)是否会被洗牌到它们的目标分区,或者每个分区只有不同的键被洗牌到目标?
如果所有键都被打乱,那么带有 set() 操作的 aggregate() 将减少打乱。
def set_update(u, v):
u.add(v)
return u
rdd.aggregate(set(), set_update, lambda u1,u2: u1|u2)
unique
通过 reduceByKey
在 (element, None)
对上实现。所以它只对每个分区的唯一值进行洗牌。如果重复次数很少,它仍然是相当昂贵的操作。
有些情况下使用 set
会很有用。特别是如果您在 PairwseRDD
上调用 distinct
,您可能更喜欢 aggregateByKey
/ combineByKey
而不是同时实现重复数据删除和按键分区。特别考虑以下代码:
rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)])
rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")])
rdd1.distinct().join(rdd2)
它必须洗牌 rdd1
两次 - 一次 distinct
一次 join
一次。相反,您可以使用 combineByKey
:
def flatten(kvs):
(key, (left, right)) = kvs
for v in left:
yield (key, (v, right))
aggregated = (rdd1
.aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2))
rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions())
(aggregated.join(rdd2_partitioned)
.flatMap(flatten))
注意:
join
逻辑在 Scala 中与在 Python 中略有不同(PySpark 使用 union
后跟 groupByKey
,参见
RDD