Spark 列出所有缓存的 RDD 名称并取消持久化
Spark list all cached RDD names and unpersist
我是 Apache Spark 的新手,我创建了几个 RDD 和 DataFrame,缓存了它们,现在我想使用下面的命令取消保留其中的一些
rddName.unpersist()
但我记不起他们的名字了。我使用 sc.getPersistentRDDs
但输出不包含名称。我还使用浏览器查看缓存的 rdds,但同样没有名称信息。我错过了什么吗?
rrdName
变量没有特殊含义。它只是对 RDD 的引用。例如,在下面的代码中
val rrdName: RDD[Something]
val name2 = rrdName
name2
和 rrdName
是指向同一个 RDD 的两个引用。调用 name2.unpersist
与调用 rrdName.unpersist
相同。
如果你想unpersist
一个RDD,你必须手动保留对它的引用。
@Dikei 的回答其实是正确的,但我相信你要找的是 sc.getPersistentRDDs
:
scala> val rdd1 = sc.makeRDD(1 to 100)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27
scala> val rdd2 = sc.makeRDD(10 to 1000)
# rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:27
scala> rdd2.cache.setName("rdd_2")
# res0: rdd2.type = rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27
scala> sc.getPersistentRDDs
# res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(1 -> rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27)
scala> rdd1.cache.setName("foo")
# res2: rdd1.type = foo ParallelCollectionRDD[0] at makeRDD at <console>:27
scala> sc.getPersistentRDDs
# res3: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(1 -> rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27, 0 -> foo ParallelCollectionRDD[0] at makeRDD at <console>:27)
现在让我们添加另一个 RDD
并命名为:
scala> rdd3.setName("bar")
# res4: rdd3.type = bar ParallelCollectionRDD[2] at makeRDD at <console>:27
scala> sc.getPersistentRDDs
# res5: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(1 -> rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27, 0 -> foo ParallelCollectionRDD[0] at makeRDD at <console>:27)
我们注意到它实际上并没有持久化。
PySparkers:getPersistentRDDs isn't yet implemented in Python,所以通过深入 Java:
取消持久化你的 RDDs
for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
rdd.unpersist()
Scala 这样做的通用方法...通过 spark 上下文循环获取所有持久性 RDD 和 unpersist
。
我将在驱动程序的末尾使用它。
for ( (id,rdd) <- sparkSession.sparkContext.getPersistentRDDs ) {
log.info("Unexpected cached RDD " + id)
rdd.unpersist()
}
Java 这样做的通用方法...其中 jsc 是 JavaSparkContext
if (jsc != null) {
Map<Integer, JavaRDD<?>> persistentRDDS = jsc.getPersistentRDDs();
// using for-each loop for iteration over Map.entrySet()
for (Map.Entry<Integer, JavaRDD<?>> entry : persistentRDDS.entrySet()) {
LOG.info("Key = " + entry.getKey() +
", un persisting cached RDD = " + entry.getValue().unpersist());
}
}
java 中 unpersist
的另一种缩写形式,不知道 rdd 名称是:
Map<Integer, JavaRDD<?>> persistentRDDS = jsc.getPersistentRDDs();
persistentRDDS.values().forEach(JavaRDD::unpersist);
我是 Apache Spark 的新手,我创建了几个 RDD 和 DataFrame,缓存了它们,现在我想使用下面的命令取消保留其中的一些
rddName.unpersist()
但我记不起他们的名字了。我使用 sc.getPersistentRDDs
但输出不包含名称。我还使用浏览器查看缓存的 rdds,但同样没有名称信息。我错过了什么吗?
rrdName
变量没有特殊含义。它只是对 RDD 的引用。例如,在下面的代码中
val rrdName: RDD[Something]
val name2 = rrdName
name2
和 rrdName
是指向同一个 RDD 的两个引用。调用 name2.unpersist
与调用 rrdName.unpersist
相同。
如果你想unpersist
一个RDD,你必须手动保留对它的引用。
@Dikei 的回答其实是正确的,但我相信你要找的是 sc.getPersistentRDDs
:
scala> val rdd1 = sc.makeRDD(1 to 100)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27
scala> val rdd2 = sc.makeRDD(10 to 1000)
# rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:27
scala> rdd2.cache.setName("rdd_2")
# res0: rdd2.type = rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27
scala> sc.getPersistentRDDs
# res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(1 -> rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27)
scala> rdd1.cache.setName("foo")
# res2: rdd1.type = foo ParallelCollectionRDD[0] at makeRDD at <console>:27
scala> sc.getPersistentRDDs
# res3: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(1 -> rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27, 0 -> foo ParallelCollectionRDD[0] at makeRDD at <console>:27)
现在让我们添加另一个 RDD
并命名为:
scala> rdd3.setName("bar")
# res4: rdd3.type = bar ParallelCollectionRDD[2] at makeRDD at <console>:27
scala> sc.getPersistentRDDs
# res5: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(1 -> rdd_2 ParallelCollectionRDD[1] at makeRDD at <console>:27, 0 -> foo ParallelCollectionRDD[0] at makeRDD at <console>:27)
我们注意到它实际上并没有持久化。
PySparkers:getPersistentRDDs isn't yet implemented in Python,所以通过深入 Java:
取消持久化你的 RDDsfor (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
rdd.unpersist()
Scala 这样做的通用方法...通过 spark 上下文循环获取所有持久性 RDD 和 unpersist
。
我将在驱动程序的末尾使用它。
for ( (id,rdd) <- sparkSession.sparkContext.getPersistentRDDs ) {
log.info("Unexpected cached RDD " + id)
rdd.unpersist()
}
Java 这样做的通用方法...其中 jsc 是 JavaSparkContext
if (jsc != null) {
Map<Integer, JavaRDD<?>> persistentRDDS = jsc.getPersistentRDDs();
// using for-each loop for iteration over Map.entrySet()
for (Map.Entry<Integer, JavaRDD<?>> entry : persistentRDDS.entrySet()) {
LOG.info("Key = " + entry.getKey() +
", un persisting cached RDD = " + entry.getValue().unpersist());
}
}
java 中 unpersist
的另一种缩写形式,不知道 rdd 名称是:
Map<Integer, JavaRDD<?>> persistentRDDS = jsc.getPersistentRDDs();
persistentRDDS.values().forEach(JavaRDD::unpersist);