在 Apache Spark cogroup 中,如何确保不移动 >2 个操作数的 1 个 RDD?

In Apache Spark cogroup, how to make sure 1 RDD of >2 operands is not moved?

在同群 t运行形成中,例如RDD1.cogroup(RDD2, ...),我曾经假设 Spark 只有 shuffles/moves RDD2 并保留 RDD1 的分区和内存存储,如果:

  1. RDD1 有一个明确的分区器
  2. RDD1 已缓存。

在我的其他项目中,大多数洗牌行为似乎都符合这个假设。所以昨天我写了一个简短的scala程序来一劳永逸地证明它:

// sc is the SparkContext
val rdd1 = sc.parallelize(1 to 10, 4).map(v => v->v)
  .partitionBy(new HashPartitioner(4))
rdd1.persist().count()
val rdd2 = sc.parallelize(1 to 10, 4).map(v => (11-v)->v)

val cogrouped = rdd1.cogroup(rdd2).map {
  v =>
    v._2._1.head -> v._2._2.head
}

val zipped = cogrouped.zipPartitions(rdd1, rdd2) {
  (itr1, itr2, itr3) =>
    itr1.zipAll(itr2.map(_._2), 0->0, 0).zipAll(itr3.map(_._2), (0->0)->0, 0)
      .map {
        v =>
          (v._1._1._1, v._1._1._2, v._1._2, v._2)
      }
}

zipped.collect().foreach(println)

如果 rdd1 不移动,压缩的第一列应该与第三列具有相同的值,所以我 运行 程序,哎呀:

(4,7,4,1)
(8,3,8,2)
(1,10,1,3)
(9,2,5,4)
(5,6,9,5)
(6,5,2,6)
(10,1,6,7)
(2,9,10,0)
(3,8,3,8)
(7,4,7,9)
(0,0,0,10)

假设不成立。 Spark 可能做了一些内部优化,并决定重新生成 rdd1 的分区比将它们保留在缓存中要快得多。

所以问题是:如果我的编程要求不移动 RDD1(并保持缓存)是因为速度以外的其他原因(例如资源局部性),或者在某些情况下 Spark 内部优化不是可取的,是否有一种明确指示框架在所有类似 cogroup 的操作中不移动 ope运行d 的方法?这也包括连接、外部连接和 groupWith。

非常感谢您的帮助。到目前为止,我正在使用广播连接作为一种不太可扩展的临时解决方案,它不会持续很长时间就会导致我的集群崩溃。我期待一个符合分布式计算原则的解决方案。

If rdd1 doesn't move the first column of zipped should have the same value as the third column

这个假设是不正确的。创建CoGroupedRDD不仅仅是shuffle,而是生成匹配对应记录所需的内部结构。在内部,Spark 将使用它自己的 ExternalAppendOnlyMap which uses custom open hash table implementation (AppendOnlyMap),它不提供任何排序​​保证。

如果检查调试字符串:

zipped.toDebugString
(4) ZippedPartitionsRDD3[8] at zipPartitions at <console>:36 []
 |  MapPartitionsRDD[7] at map at <console>:31 []
 |  MapPartitionsRDD[6] at cogroup at <console>:31 []
 |  CoGroupedRDD[5] at cogroup at <console>:31 []
 |  ShuffledRDD[2] at partitionBy at <console>:27 []
 |      CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 +-(4) MapPartitionsRDD[1] at map at <console>:26 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:26 []
 +-(4) MapPartitionsRDD[4] at map at <console>:29 []
    |  ParallelCollectionRDD[3] at parallelize at <console>:29 []
 |  ShuffledRDD[2] at partitionBy at <console>:27 []
 |      CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 +-(4) MapPartitionsRDD[1]...

你会看到 Spark 确实使用 CachedPartitions 来计算 zipped RDD。如果您还跳过删除分区器的 map 转换,您会看到 coGroup 重用了 rdd1:

提供的分区器
rdd1.cogroup(rdd2).partitioner == rdd1.partitioner
Boolean = true