RDD沿袭缓存
RDD lineage cache
我无法理解 RDD 的血统。例如
假设我们有这样的血统:
hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)
如果我们持久化第一个 RDD 并在一些操作之后我们取消持久化它。这会影响其他依赖的RDD吗?如果是,那么如何避免呢?
我的观点是,如果我们取消保留父 RDD,此操作是否会从子 RDD 中删除分区?
让我们来看一个例子。这将在一个分区中创建一个带有整数序列的 RDD。一个分区的原因只是为了保持示例其余部分的顺序。
scala> val seq = Seq(1,2,3,4,5)
seq: Seq[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(seq, 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
现在让我们创建两个新的 RDD,它们是原始 RDD 的映射版本:
scala> val firstMappedRDD = rdd.map { case i => println(s"firstMappedRDD calc for $i"); i * 2 }
firstMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:25
scala> firstMappedRDD.toDebugString
res25: String =
(1) MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> val secondMappedRDD = firstMappedRDD.map { case i => println(s"secondMappedRDD calc for $i"); i * 2 }
secondMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:27
scala> secondMappedRDD.toDebugString
res26: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
我们可以使用 toDebugString
查看谱系。我在每个地图步骤中添加了 println
s 以明确何时调用 map
。让我们收集每个 RDD 看看会发生什么:
scala> firstMappedRDD.collect()
firstMappedRDD calc for 1
firstMappedRDD calc for 2
firstMappedRDD calc for 3
firstMappedRDD calc for 4
firstMappedRDD calc for 5
res27: Array[Int] = Array(2, 4, 6, 8, 10)
scala> secondMappedRDD.collect()
firstMappedRDD calc for 1
secondMappedRDD calc for 2
firstMappedRDD calc for 2
secondMappedRDD calc for 4
firstMappedRDD calc for 3
secondMappedRDD calc for 6
firstMappedRDD calc for 4
secondMappedRDD calc for 8
firstMappedRDD calc for 5
secondMappedRDD calc for 10
res28: Array[Int] = Array(4, 8, 12, 16, 20)
如您所料,当我们调用 secondMappedRDD.collect()
时,第一步的地图会再次调用。所以现在让我们 cache
第一个映射的 RDD。
scala> firstMappedRDD.cache()
res29: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
scala> secondMappedRDD.toDebugString
res31: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> firstMappedRDD.count()
firstMappedRDD calc for 1
firstMappedRDD calc for 2
firstMappedRDD calc for 3
firstMappedRDD calc for 4
firstMappedRDD calc for 5
res32: Long = 5
scala> secondMappedRDD.toDebugString
res33: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| CachedPartitions: 1; MemorySize: 120.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
在第一个映射的结果在缓存中之后,第二个映射的 RDD 的沿袭在其沿袭中具有第一个映射的缓存结果。现在让我们调用 collect
:
scala> secondMappedRDD.collect
secondMappedRDD calc for 2
secondMappedRDD calc for 4
secondMappedRDD calc for 6
secondMappedRDD calc for 8
secondMappedRDD calc for 10
res34: Array[Int] = Array(4, 8, 12, 16, 20)
现在 unpersist
并再次调用 collect
。
scala> firstMappedRDD.unpersist()
res36: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
scala> secondMappedRDD.toDebugString
res37: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> secondMappedRDD.collect
firstMappedRDD calc for 1
secondMappedRDD calc for 2
firstMappedRDD calc for 2
secondMappedRDD calc for 4
firstMappedRDD calc for 3
secondMappedRDD calc for 6
firstMappedRDD calc for 4
secondMappedRDD calc for 8
firstMappedRDD calc for 5
secondMappedRDD calc for 10
res38: Array[Int] = Array(4, 8, 12, 16, 20)
所以当我们 collect
在第一个映射 RDD 被取消持久化之后,第二个映射 RDD 的结果被再次调用。
如果源是 HDFS 或任何其他存储,数据将再次从源中检索。
我无法理解 RDD 的血统。例如
假设我们有这样的血统:
hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)
如果我们持久化第一个 RDD 并在一些操作之后我们取消持久化它。这会影响其他依赖的RDD吗?如果是,那么如何避免呢?
我的观点是,如果我们取消保留父 RDD,此操作是否会从子 RDD 中删除分区?
让我们来看一个例子。这将在一个分区中创建一个带有整数序列的 RDD。一个分区的原因只是为了保持示例其余部分的顺序。
scala> val seq = Seq(1,2,3,4,5)
seq: Seq[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(seq, 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
现在让我们创建两个新的 RDD,它们是原始 RDD 的映射版本:
scala> val firstMappedRDD = rdd.map { case i => println(s"firstMappedRDD calc for $i"); i * 2 }
firstMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:25
scala> firstMappedRDD.toDebugString
res25: String =
(1) MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> val secondMappedRDD = firstMappedRDD.map { case i => println(s"secondMappedRDD calc for $i"); i * 2 }
secondMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:27
scala> secondMappedRDD.toDebugString
res26: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
我们可以使用 toDebugString
查看谱系。我在每个地图步骤中添加了 println
s 以明确何时调用 map
。让我们收集每个 RDD 看看会发生什么:
scala> firstMappedRDD.collect()
firstMappedRDD calc for 1
firstMappedRDD calc for 2
firstMappedRDD calc for 3
firstMappedRDD calc for 4
firstMappedRDD calc for 5
res27: Array[Int] = Array(2, 4, 6, 8, 10)
scala> secondMappedRDD.collect()
firstMappedRDD calc for 1
secondMappedRDD calc for 2
firstMappedRDD calc for 2
secondMappedRDD calc for 4
firstMappedRDD calc for 3
secondMappedRDD calc for 6
firstMappedRDD calc for 4
secondMappedRDD calc for 8
firstMappedRDD calc for 5
secondMappedRDD calc for 10
res28: Array[Int] = Array(4, 8, 12, 16, 20)
如您所料,当我们调用 secondMappedRDD.collect()
时,第一步的地图会再次调用。所以现在让我们 cache
第一个映射的 RDD。
scala> firstMappedRDD.cache()
res29: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
scala> secondMappedRDD.toDebugString
res31: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> firstMappedRDD.count()
firstMappedRDD calc for 1
firstMappedRDD calc for 2
firstMappedRDD calc for 3
firstMappedRDD calc for 4
firstMappedRDD calc for 5
res32: Long = 5
scala> secondMappedRDD.toDebugString
res33: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| CachedPartitions: 1; MemorySize: 120.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
在第一个映射的结果在缓存中之后,第二个映射的 RDD 的沿袭在其沿袭中具有第一个映射的缓存结果。现在让我们调用 collect
:
scala> secondMappedRDD.collect
secondMappedRDD calc for 2
secondMappedRDD calc for 4
secondMappedRDD calc for 6
secondMappedRDD calc for 8
secondMappedRDD calc for 10
res34: Array[Int] = Array(4, 8, 12, 16, 20)
现在 unpersist
并再次调用 collect
。
scala> firstMappedRDD.unpersist()
res36: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
scala> secondMappedRDD.toDebugString
res37: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> secondMappedRDD.collect
firstMappedRDD calc for 1
secondMappedRDD calc for 2
firstMappedRDD calc for 2
secondMappedRDD calc for 4
firstMappedRDD calc for 3
secondMappedRDD calc for 6
firstMappedRDD calc for 4
secondMappedRDD calc for 8
firstMappedRDD calc for 5
secondMappedRDD calc for 10
res38: Array[Int] = Array(4, 8, 12, 16, 20)
所以当我们 collect
在第一个映射 RDD 被取消持久化之后,第二个映射 RDD 的结果被再次调用。
如果源是 HDFS 或任何其他存储,数据将再次从源中检索。