spark 的 distinct() 函数是否仅混洗每个分区中的不同元组

Does spark's distinct() function shuffle only the distinct tuples from each partition

据我了解,distinct() 散列对 RDD 进行分区以标识唯一键。但是它是否优化了每个分区只移动不同的元组?

想象一个具有以下分区的 RDD

  1. [1, 2, 2, 1, 4, 2, 2]
  2. [1, 3, 3, 5, 4, 5, 5, 5]

在这个 RDD 上的不同键上,所有重复的键(分区 1 中的 2 和分区 2 中的 5)是否会被洗牌到它们的目标分区,或者每个分区只有不同的键被洗牌到目标?

如果所有键都被打乱,那么带有 set() 操作的 aggregate() 将减少打乱。

def set_update(u, v):
    u.add(v)
    return u
rdd.aggregate(set(), set_update, lambda u1,u2: u1|u2)

unique 通过 reduceByKey(element, None) 对上实现。所以它只对每个分区的唯一值进行洗牌。如果重复次数很少,它仍然是相当昂贵的操作。

有些情况下使用 set 会很有用。特别是如果您在 PairwseRDD 上调用 distinct,您可能更喜欢 aggregateByKey / combineByKey 而不是同时实现重复数据删除和按键分区。特别考虑以下代码:

rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)])
rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")])
rdd1.distinct().join(rdd2)

它必须洗牌 rdd1 两次 - 一次 distinct 一次 join 一次。相反,您可以使用 combineByKey:

def flatten(kvs):
    (key, (left, right)) = kvs
    for v in left:
        yield (key, (v, right))

aggregated = (rdd1
    .aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2))

rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions())

(aggregated.join(rdd2_partitioned)
    .flatMap(flatten))

注意

join 逻辑在 Scala 中与在 Python 中略有不同(PySpark 使用 union 后跟 groupByKey,参见 Python 和 Scala DAG),因此我们必须在调用 join.

之前手动划分第二个 RDD