getPersistentRDDs returns Spark 2.2.0 中的缓存 RDD 和数据帧映射,但在 Spark 2.4.7 中 - 它 returns 仅缓存 RDD 映射

getPersistentRDDs returns Map of cached RDDs and DataFrames in Spark 2.2.0, but in Spark 2.4.7 - it returns Map of cached RDDs only

如果在 Spark 版本 2.2.0 中缓存 RDD 和 DataFrame getPersistentRDDs returns 映射大小 2:

scala> val rdd = sc.parallelize(Seq(1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val df = sc.parallelize(Seq(2)).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> spark.sparkContext.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache
res1: df.type = [value: int]

scala> spark.sparkContext.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(4 -> *SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
 MapPartitionsRDD[4] at cache at <console>:27)

scala> rdd.cache
res3: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> spark.sparkContext.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(0 -> ParallelCollectionRDD[0] at parallelize at <console>:24, 4 -> *SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
 MapPartitionsRDD[4] at cache at <console>:27)

但在 Spark 版本 2.4.7 中 getPersistentRDDs returns 地图大小 1

...
scala> spark.sparkContext.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(0 -> ParallelCollectionRDD[0] at parallelize at <console>:24)

以及如何获取所有缓存对象而不仅仅是 RDD 的问题 开始表现不同的方法突然发生了什么?

数据帧实际上并没有缓存在内存中,因为还没有对数据帧执行任何操作,所以从 getPersistentRDDs 的结果中排除它实际上是公平的。我认为更高版本中的行为实际上是可取的。但是一旦你在dataframe上做了一些事情,它会被缓存,它会出现在getPersistentRDDs的结果中,如下图:

scala> val df = sc.parallelize(Seq(2)).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache
res1: df.type = [value: int]

scala> sc.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.count()
res3: Long = 1

scala> sc.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(3 -> *(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]
 MapPartitionsRDD[3] at count at <console>:26)