什么时候执行缓存和持久化(因为它们看起来不像动作)?
When are cache and persist executed (since they don't seem like actions)?
我正在实现一个 spark 应用程序,下面是一个示例片段(不是完全相同的代码):
val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)
从 Spark Application Master UI 检查此代码的性能时,我看到 count
操作的条目,但没有 persist
的条目。此计数操作的 DAG 也有一个用于 'map' 转换的节点(上述代码的第 2 行)。
是否可以安全地断定在遇到 count
(最后一行)时执行映射转换,而不是在遇到 persist
时执行?
此外,rdd2 究竟在什么时候持久化了?
我知道在 RDD 上只能调用两种类型的操作——转换和操作。如果在调用 count
操作时延迟持久化 RDD,持久化会被视为转换或操作还是两者都不是?
Dataset 的 cache
和 persist
运算符是惰性的,在您调用操作之前没有任何效果(并等到缓存完成,这是获得更好性能的额外代价稍后)。
来自 Spark 的官方文档RDD Persistence(粗体字是我的):
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the persist()
or cache()
methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
这正是某些人(以及 Spark SQL 本身!)使用以下技巧的原因:
rdd2.persist(StorageLevel.MEMORY_AND_DISK).count
触发缓存。
count
运算符相当便宜,因此实际效果是缓存几乎是在该行之后立即执行的(在缓存完成之前可能会有一小段延迟,因为它是异步执行的)。
这个count
在persist
之后的好处如下:
没有任何操作(但 count
本身)将“遭受”额外的缓存时间
这条线和使用缓存 rdd2
的地方之间的时间可能足以完全完成缓存,因此可以更好地利用时间(没有额外的“减速”用于缓存)
所以当你问:
would persist
be considered a transformation or an action or neither?
我会说两者都不是,并将其视为优化提示(可能会或可能不会执行或考虑曾经)。
使用 web UI 的存储选项卡查看哪些数据集(作为其底层 RDD)已被持久化。
您还可以使用 explain
(或简称 QueryExecution.optimizedPlan
)查看 cache
或 persist
运算符的输出。
val q1 = spark.range(10).groupBy('id % 5).agg(count("*") as "count").cache
scala> q1.explain
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [(id % 5)#120L, count#119L]
+- InMemoryRelation [(id % 5)#120L, count#119L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
+- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)])
+- *(1) Range (0, 10, step=1, splits=16)
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#5L, count#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)], output=[(id % 5)#5L, count#4L])
02 +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
03 +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)], output=[(id#0L % 5)#8L, count#10L])
04 +- *(1) Range (0, 10, step=1, splits=16)
请注意,上面的 count
是一个标准函数,不是一个动作,不会发生缓存。 count
是标准函数和数据集操作的名称只是巧合。
您可以使用纯 SQL 缓存 table(这很急切!)
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
*(1) ColumnarToRow
+- Scan In-memory table `range5` [id#51L]
+- InMemoryRelation [id#51L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 5, step=1, splits=16)
InMemoryTableScan
物理运算符(使用 InMemoryRelation
逻辑计划)是确保查询缓存在内存中并因此重用的方法。
此外,Spark SQL 本身使用相同的模式来触发 SQL 的 CACHE TABLE query 的 DataFrame 缓存(与 RDD 缓存不同,默认情况下是急切的):
if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
}
这意味着就缓存而言,根据运算符的不同,您可能会得到不同的结果。 cache
和 persist
运算符默认是懒惰的,而 SQL 的 CACHE TABLE
是急切的。
Is it safe to conclude that the map transformation is executed when count (in the last line) is encountered, and not when persist is encountered?
是
Also, at what point is rdd2 actually persisted?
在执行count语句的同时读取、映射、持久化数据
would persist be considered a transformation or an action or neither?
也不是,但是从完成的处理工作来看,你可以认为它是一个转换。 Spark 是惰性的,只有在你要求结果时才会工作。持久化数据框时不需要结果,因此 Spark 不起作用。这样一来,persist
就像是一个变换
我正在实现一个 spark 应用程序,下面是一个示例片段(不是完全相同的代码):
val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)
从 Spark Application Master UI 检查此代码的性能时,我看到 count
操作的条目,但没有 persist
的条目。此计数操作的 DAG 也有一个用于 'map' 转换的节点(上述代码的第 2 行)。
是否可以安全地断定在遇到 count
(最后一行)时执行映射转换,而不是在遇到 persist
时执行?
此外,rdd2 究竟在什么时候持久化了?
我知道在 RDD 上只能调用两种类型的操作——转换和操作。如果在调用 count
操作时延迟持久化 RDD,持久化会被视为转换或操作还是两者都不是?
Dataset 的 cache
和 persist
运算符是惰性的,在您调用操作之前没有任何效果(并等到缓存完成,这是获得更好性能的额外代价稍后)。
来自 Spark 的官方文档RDD Persistence(粗体字是我的):
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the
persist()
orcache()
methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
这正是某些人(以及 Spark SQL 本身!)使用以下技巧的原因:
rdd2.persist(StorageLevel.MEMORY_AND_DISK).count
触发缓存。
count
运算符相当便宜,因此实际效果是缓存几乎是在该行之后立即执行的(在缓存完成之前可能会有一小段延迟,因为它是异步执行的)。
这个count
在persist
之后的好处如下:
没有任何操作(但
count
本身)将“遭受”额外的缓存时间这条线和使用缓存
rdd2
的地方之间的时间可能足以完全完成缓存,因此可以更好地利用时间(没有额外的“减速”用于缓存)
所以当你问:
would
persist
be considered a transformation or an action or neither?
我会说两者都不是,并将其视为优化提示(可能会或可能不会执行或考虑曾经)。
使用 web UI 的存储选项卡查看哪些数据集(作为其底层 RDD)已被持久化。
您还可以使用 explain
(或简称 QueryExecution.optimizedPlan
)查看 cache
或 persist
运算符的输出。
val q1 = spark.range(10).groupBy('id % 5).agg(count("*") as "count").cache
scala> q1.explain
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [(id % 5)#120L, count#119L]
+- InMemoryRelation [(id % 5)#120L, count#119L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
+- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)])
+- *(1) Range (0, 10, step=1, splits=16)
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#5L, count#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)], output=[(id % 5)#5L, count#4L])
02 +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
03 +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)], output=[(id#0L % 5)#8L, count#10L])
04 +- *(1) Range (0, 10, step=1, splits=16)
请注意,上面的 count
是一个标准函数,不是一个动作,不会发生缓存。 count
是标准函数和数据集操作的名称只是巧合。
您可以使用纯 SQL 缓存 table(这很急切!)
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
*(1) ColumnarToRow
+- Scan In-memory table `range5` [id#51L]
+- InMemoryRelation [id#51L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 5, step=1, splits=16)
InMemoryTableScan
物理运算符(使用 InMemoryRelation
逻辑计划)是确保查询缓存在内存中并因此重用的方法。
此外,Spark SQL 本身使用相同的模式来触发 SQL 的 CACHE TABLE query 的 DataFrame 缓存(与 RDD 缓存不同,默认情况下是急切的):
if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
}
这意味着就缓存而言,根据运算符的不同,您可能会得到不同的结果。 cache
和 persist
运算符默认是懒惰的,而 SQL 的 CACHE TABLE
是急切的。
Is it safe to conclude that the map transformation is executed when count (in the last line) is encountered, and not when persist is encountered?
是
Also, at what point is rdd2 actually persisted?
在执行count语句的同时读取、映射、持久化数据
would persist be considered a transformation or an action or neither?
也不是,但是从完成的处理工作来看,你可以认为它是一个转换。 Spark 是惰性的,只有在你要求结果时才会工作。持久化数据框时不需要结果,因此 Spark 不起作用。这样一来,persist
就像是一个变换