如何缓存来自 sql 查询的结果数据帧

How to cache resulting dataframe from sql query

我在 SO 上看到了几个关于缓存 sql 表的问题,但其中 none 似乎完全回答了我的问题。

查询生成的数据帧(来自 sqlContext.sql("..."))似乎不像常规数据帧那样可缓存。

这是一些示例代码 (spark 2.2):

import org.apache.spark.sql._

def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined

val df = List("1", "2", "3").toDF.cache
df.show
isCached(df) // 1) Here, isCached returns 'true'

df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")

val df2 = spark.sqlContext.sql("select value, count(*) from myTable group by value").cache
df2.show
isCached(df2) // 2) (???) returns 'false'

val df3 = spark.sqlContext.sql("select value, 'a', count(*) from myTable group by value")
df3.registerTempTable("x")
spark.sqlContext.cacheTable("x")
df3.show
spark.catalog.isCached("x") // Returns 'true'
isCached(df3) // 3) (???) Returns 'false'

spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'
spark.catalog.isCached("x") // 4) (???) Returns 'false'

Spark UI 显示了一些与 df2 关联的存储,但它似乎与 df 相关联。通常,我们先 .cache() 然后 .count() 实现,然后在不再需要时 unpersist 父数据框。在此示例中,当取消持久化 df 时,在 spark UI 中看到的 df2df3 的存储也会消失。

那么我们如何让 (2)、(3) 或最重要的 (4) 变为 return true?

一段时间后,我认为 post 回答我的问题可能会有用。

诀窍是用新的数据框截断关系谱系。

为此,我打电话给 spark.createDataFrame(df.rdd, df.schema).cache()。 其他人建议调用 rdd.cache.count 但这似乎比创建一个新的而不具体化底层 rdd 效率低得多。

import org.apache.spark.sql._

def isCached(df: DataFrame) = spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).isDefined

val df = List("1", "2", "3").toDF.cache
df.count // cache the df.
isCached(df) // 1) Here, isCached returns 'true'

df.createOrReplaceTempView("myTable")
spark.catalog.isCached("myTable")

val df2Temp = spark.sqlContext.sql("select value, count(*) from myTable group by value")
// truncate lineage and materialize new dataframe
val df2Cached = spark.createDataFrame(df2Temp.rdd, df2Temp.schema).cache
df2Cached.count
isCached(df2Cached) // 2) returns 'true'
df2Cached.createOrReplaceTempView("x")

// Still cached
isCached(df) 
spark.catalog.isCached("myTable")

// parent df not needed anymore
spark.sqlContext.uncacheTable("myTable")
spark.catalog.isCached("myTable") // OK: Returns 'false'
isCached(df) // OK: Returns 'false'

spark.catalog.isCached("x") // Still cached