在 Apache Spark cogroup 中,如何确保不移动 >2 个操作数的 1 个 RDD?
In Apache Spark cogroup, how to make sure 1 RDD of >2 operands is not moved?
在同群 t运行形成中,例如RDD1.cogroup(RDD2, ...),我曾经假设 Spark 只有 shuffles/moves RDD2 并保留 RDD1 的分区和内存存储,如果:
- RDD1 有一个明确的分区器
- RDD1 已缓存。
在我的其他项目中,大多数洗牌行为似乎都符合这个假设。所以昨天我写了一个简短的scala程序来一劳永逸地证明它:
// sc is the SparkContext
val rdd1 = sc.parallelize(1 to 10, 4).map(v => v->v)
.partitionBy(new HashPartitioner(4))
rdd1.persist().count()
val rdd2 = sc.parallelize(1 to 10, 4).map(v => (11-v)->v)
val cogrouped = rdd1.cogroup(rdd2).map {
v =>
v._2._1.head -> v._2._2.head
}
val zipped = cogrouped.zipPartitions(rdd1, rdd2) {
(itr1, itr2, itr3) =>
itr1.zipAll(itr2.map(_._2), 0->0, 0).zipAll(itr3.map(_._2), (0->0)->0, 0)
.map {
v =>
(v._1._1._1, v._1._1._2, v._1._2, v._2)
}
}
zipped.collect().foreach(println)
如果 rdd1 不移动,压缩的第一列应该与第三列具有相同的值,所以我 运行 程序,哎呀:
(4,7,4,1)
(8,3,8,2)
(1,10,1,3)
(9,2,5,4)
(5,6,9,5)
(6,5,2,6)
(10,1,6,7)
(2,9,10,0)
(3,8,3,8)
(7,4,7,9)
(0,0,0,10)
假设不成立。 Spark 可能做了一些内部优化,并决定重新生成 rdd1 的分区比将它们保留在缓存中要快得多。
所以问题是:如果我的编程要求不移动 RDD1(并保持缓存)是因为速度以外的其他原因(例如资源局部性),或者在某些情况下 Spark 内部优化不是可取的,是否有一种明确指示框架在所有类似 cogroup 的操作中不移动 ope运行d 的方法?这也包括连接、外部连接和 groupWith。
非常感谢您的帮助。到目前为止,我正在使用广播连接作为一种不太可扩展的临时解决方案,它不会持续很长时间就会导致我的集群崩溃。我期待一个符合分布式计算原则的解决方案。
If rdd1 doesn't move the first column of zipped should have the same value as the third column
这个假设是不正确的。创建CoGroupedRDD
不仅仅是shuffle,而是生成匹配对应记录所需的内部结构。在内部,Spark 将使用它自己的 ExternalAppendOnlyMap
which uses custom open hash table implementation (AppendOnlyMap
),它不提供任何排序保证。
如果检查调试字符串:
zipped.toDebugString
(4) ZippedPartitionsRDD3[8] at zipPartitions at <console>:36 []
| MapPartitionsRDD[7] at map at <console>:31 []
| MapPartitionsRDD[6] at cogroup at <console>:31 []
| CoGroupedRDD[5] at cogroup at <console>:31 []
| ShuffledRDD[2] at partitionBy at <console>:27 []
| CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(4) MapPartitionsRDD[1] at map at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:26 []
+-(4) MapPartitionsRDD[4] at map at <console>:29 []
| ParallelCollectionRDD[3] at parallelize at <console>:29 []
| ShuffledRDD[2] at partitionBy at <console>:27 []
| CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(4) MapPartitionsRDD[1]...
你会看到 Spark 确实使用 CachedPartitions
来计算 zipped
RDD
。如果您还跳过删除分区器的 map
转换,您会看到 coGroup
重用了 rdd1
:
提供的分区器
rdd1.cogroup(rdd2).partitioner == rdd1.partitioner
Boolean = true
在同群 t运行形成中,例如RDD1.cogroup(RDD2, ...),我曾经假设 Spark 只有 shuffles/moves RDD2 并保留 RDD1 的分区和内存存储,如果:
- RDD1 有一个明确的分区器
- RDD1 已缓存。
在我的其他项目中,大多数洗牌行为似乎都符合这个假设。所以昨天我写了一个简短的scala程序来一劳永逸地证明它:
// sc is the SparkContext
val rdd1 = sc.parallelize(1 to 10, 4).map(v => v->v)
.partitionBy(new HashPartitioner(4))
rdd1.persist().count()
val rdd2 = sc.parallelize(1 to 10, 4).map(v => (11-v)->v)
val cogrouped = rdd1.cogroup(rdd2).map {
v =>
v._2._1.head -> v._2._2.head
}
val zipped = cogrouped.zipPartitions(rdd1, rdd2) {
(itr1, itr2, itr3) =>
itr1.zipAll(itr2.map(_._2), 0->0, 0).zipAll(itr3.map(_._2), (0->0)->0, 0)
.map {
v =>
(v._1._1._1, v._1._1._2, v._1._2, v._2)
}
}
zipped.collect().foreach(println)
如果 rdd1 不移动,压缩的第一列应该与第三列具有相同的值,所以我 运行 程序,哎呀:
(4,7,4,1)
(8,3,8,2)
(1,10,1,3)
(9,2,5,4)
(5,6,9,5)
(6,5,2,6)
(10,1,6,7)
(2,9,10,0)
(3,8,3,8)
(7,4,7,9)
(0,0,0,10)
假设不成立。 Spark 可能做了一些内部优化,并决定重新生成 rdd1 的分区比将它们保留在缓存中要快得多。
所以问题是:如果我的编程要求不移动 RDD1(并保持缓存)是因为速度以外的其他原因(例如资源局部性),或者在某些情况下 Spark 内部优化不是可取的,是否有一种明确指示框架在所有类似 cogroup 的操作中不移动 ope运行d 的方法?这也包括连接、外部连接和 groupWith。
非常感谢您的帮助。到目前为止,我正在使用广播连接作为一种不太可扩展的临时解决方案,它不会持续很长时间就会导致我的集群崩溃。我期待一个符合分布式计算原则的解决方案。
If rdd1 doesn't move the first column of zipped should have the same value as the third column
这个假设是不正确的。创建CoGroupedRDD
不仅仅是shuffle,而是生成匹配对应记录所需的内部结构。在内部,Spark 将使用它自己的 ExternalAppendOnlyMap
which uses custom open hash table implementation (AppendOnlyMap
),它不提供任何排序保证。
如果检查调试字符串:
zipped.toDebugString
(4) ZippedPartitionsRDD3[8] at zipPartitions at <console>:36 []
| MapPartitionsRDD[7] at map at <console>:31 []
| MapPartitionsRDD[6] at cogroup at <console>:31 []
| CoGroupedRDD[5] at cogroup at <console>:31 []
| ShuffledRDD[2] at partitionBy at <console>:27 []
| CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(4) MapPartitionsRDD[1] at map at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:26 []
+-(4) MapPartitionsRDD[4] at map at <console>:29 []
| ParallelCollectionRDD[3] at parallelize at <console>:29 []
| ShuffledRDD[2] at partitionBy at <console>:27 []
| CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(4) MapPartitionsRDD[1]...
你会看到 Spark 确实使用 CachedPartitions
来计算 zipped
RDD
。如果您还跳过删除分区器的 map
转换,您会看到 coGroup
重用了 rdd1
:
rdd1.cogroup(rdd2).partitioner == rdd1.partitioner
Boolean = true