spark 中的 cache() 是改变 RDD 的状态还是创建一个新的?
Does cache() in spark change the state of the RDD or create a new one?
这个问题是我 .
上一个问题的后续问题
当在RDD上调用cache()
时,RDD的状态是否改变了(返回的RDD只是this
以便于使用)或者创建一个新的RDD来包装现有的一个?
以下代码会发生什么:
// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();
// Case 1, will 'a' be calculated twice in this case
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);
// Case 2, will the data of the calculation of 'a'
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);
缓存不会改变 RDD 的状态。
发生转换时,缓存会在内存中计算和具体化 RDD,同时跟踪其沿袭(依赖项)。坚持有很多层次。
由于缓存会记住 RDD 的沿袭,因此 Spark 可以在节点发生故障时重新计算丢失的分区。最后,缓存的 RDD 存在于 运行 应用程序的上下文中,一旦应用程序终止,缓存的 RDDs 也会被删除。
When calling cache() on a RDD, does the state of the RDD changed (and
the returned RDD is just this for ease of use) or a new RDD is created
the wrapped the existing one
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
缓存不会对上述 RDD 造成任何副作用。如果它已经被标记为持久化,则什么也不会发生。如果不是,唯一的副作用是将其注册到 SparkContext
,其中副作用不在 RDD
本身,而是上下文。
编辑:
看JavaRDD.cache
,好像是底层调用会引起另外一个JavaRDD
的分配:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
其中 wrapRDD
调用 JavaRDD.fromRDD
:
object JavaRDD {
implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
这会导致分配一个新的JavaRDD
。也就是说,RDD[T]
的内部实例将保持不变。
这个问题是我
当在RDD上调用cache()
时,RDD的状态是否改变了(返回的RDD只是this
以便于使用)或者创建一个新的RDD来包装现有的一个?
以下代码会发生什么:
// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();
// Case 1, will 'a' be calculated twice in this case
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);
// Case 2, will the data of the calculation of 'a'
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);
缓存不会改变 RDD 的状态。
发生转换时,缓存会在内存中计算和具体化 RDD,同时跟踪其沿袭(依赖项)。坚持有很多层次。
由于缓存会记住 RDD 的沿袭,因此 Spark 可以在节点发生故障时重新计算丢失的分区。最后,缓存的 RDD 存在于 运行 应用程序的上下文中,一旦应用程序终止,缓存的 RDDs 也会被删除。
When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
缓存不会对上述 RDD 造成任何副作用。如果它已经被标记为持久化,则什么也不会发生。如果不是,唯一的副作用是将其注册到 SparkContext
,其中副作用不在 RDD
本身,而是上下文。
编辑:
看JavaRDD.cache
,好像是底层调用会引起另外一个JavaRDD
的分配:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
其中 wrapRDD
调用 JavaRDD.fromRDD
:
object JavaRDD {
implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
这会导致分配一个新的JavaRDD
。也就是说,RDD[T]
的内部实例将保持不变。