如何缓存来自 sql 查询的结果数据帧
How to cache resulting dataframe from sql query
我在 SO 上看到了几个关于缓存 sql 表的问题,但其中 none 似乎完全回答了我的问题。
查询生成的数据帧(来自 sqlContext.sql("..."))似乎不像常规数据帧那样可缓存。
这是一些示例代码 (spark 2.2):
import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.show
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2 = spark.sqlContext.sql("select value, count(*) from myTable group by value").cache
df2.show
isCached(df2) // 2) (???) returns 'false'
val df3 = spark.sqlContext.sql("select value, 'a', count(*) from myTable group by value")
df3.registerTempTable("x")
spark.sqlContext.cacheTable("x")
df3.show
spark.catalog.isCached("x") // Returns 'true'
isCached(df3) // 3) (???) Returns 'false'
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // 4) (???) Returns 'false'
Spark UI 显示了一些与 df2 关联的存储,但它似乎与 df 相关联。通常,我们先 .cache()
然后 .count()
实现,然后在不再需要时 unpersist
父数据框。在此示例中,当取消持久化 df
时,在 spark UI 中看到的 df2
和 df3
的存储也会消失。
那么我们如何让 (2)、(3) 或最重要的 (4) 变为 return true?
一段时间后,我认为 post 回答我的问题可能会有用。
诀窍是用新的数据框截断关系谱系。
为此,我打电话给 spark.createDataFrame(df.rdd, df.schema).cache()
。
其他人建议调用 rdd.cache.count
但这似乎比创建一个新的而不具体化底层 rdd 效率低得多。
import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.count // cache the df.
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2Temp = spark.sqlContext.sql("select value, count(*) from myTable group by value")
// truncate lineage and materialize new dataframe
val df2Cached = spark.createDataFrame(df2Temp.rdd, df2Temp.schema).cache
df2Cached.count
isCached(df2Cached) // 2) returns 'true'
df2Cached.createOrReplaceTempView("x")
// Still cached
isCached(df)
spark.catalog.isCached("myTable")
// parent df not needed anymore
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // Still cached
我在 SO 上看到了几个关于缓存 sql 表的问题,但其中 none 似乎完全回答了我的问题。
查询生成的数据帧(来自 sqlContext.sql("..."))似乎不像常规数据帧那样可缓存。
这是一些示例代码 (spark 2.2):
import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.show
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2 = spark.sqlContext.sql("select value, count(*) from myTable group by value").cache
df2.show
isCached(df2) // 2) (???) returns 'false'
val df3 = spark.sqlContext.sql("select value, 'a', count(*) from myTable group by value")
df3.registerTempTable("x")
spark.sqlContext.cacheTable("x")
df3.show
spark.catalog.isCached("x") // Returns 'true'
isCached(df3) // 3) (???) Returns 'false'
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // 4) (???) Returns 'false'
Spark UI 显示了一些与 df2 关联的存储,但它似乎与 df 相关联。通常,我们先 .cache()
然后 .count()
实现,然后在不再需要时 unpersist
父数据框。在此示例中,当取消持久化 df
时,在 spark UI 中看到的 df2
和 df3
的存储也会消失。
那么我们如何让 (2)、(3) 或最重要的 (4) 变为 return true?
一段时间后,我认为 post 回答我的问题可能会有用。
诀窍是用新的数据框截断关系谱系。
为此,我打电话给 spark.createDataFrame(df.rdd, df.schema).cache()
。
其他人建议调用 rdd.cache.count
但这似乎比创建一个新的而不具体化底层 rdd 效率低得多。
import org.apache.spark.sql._
def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined
val df = List("1", "2", "3").toDF.cache
df.count // cache the df.
isCached(df) // 1) Here, isCached returns 'true'
df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")
val df2Temp = spark.sqlContext.sql("select value, count(*) from myTable group by value")
// truncate lineage and materialize new dataframe
val df2Cached = spark.createDataFrame(df2Temp.rdd, df2Temp.schema).cache
df2Cached.count
isCached(df2Cached) // 2) returns 'true'
df2Cached.createOrReplaceTempView("x")
// Still cached
isCached(df)
spark.catalog.isCached("myTable")
// parent df not needed anymore
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // Still cached