SchemaRDD(Hive Table 扫描)未被 Spark-Shell 缓存
SchemaRDD (Hive Table Scan) not cached by Spark-Shell
我正在尝试 运行 在 Hive table(通过 DataStax 4.6 在 CFS 上托管)上使用 Spark 进行一些基本数据探索。我的数据集大约是 3.1GO,我 运行 带有 dse spark --executor-memory 16g 的 spark-shell (是的,我的执行程序上确实有 16g 可用)。所以基本上我会写入 spark-shell,如下:
val dataset = hc.sql("SELECT * FROM my_hive_table") ;
val data_sample = dataset.sample(false,.01,0) ;
data_sample.cache
然后我会尝试计数以实际缓存一些东西
data_sample.count
但是当我检查 spark-shell web UI 时,我发现没有 RDD 持续存在,如果我再次尝试计数,我的整个数据集会再次从 CFS 读取。
所以我尝试通过 CFS 直接作为文本文件访问我的数据集
textFile.type = cfs:/user/hive/warehouse/my_hive_table/aaaammjj=20150526
并调整之前的代码以计算缓存 RDD 后的行数,这次 RDD 确实在两个工作人员之间使用 7 GB 进行了缓存!来自网络 UI :
cfs:/user/hive/warehouse/my_hive_table/aaaammjj=20150526 Memory Deserialized 1x Replicated
我的 schemaRDD 没有使用 Hive 进行缓存有什么原因吗?这将非常实用,因为 schemaRDD 提供了......好吧模式。
感谢任何帮助。
根据 Spark 1.2 的 official documentation,Spark SQL 可以通过调用 sqlContext.cacheTable("tableName").
然后 Spark SQL 将只扫描需要的列,并自动调整压缩以最小化内存使用和 GC 压力。您可以调用 sqlContext.uncacheTable("tableName") 从内存中删除 table。
请注意,如果您调用 schemaRDD.cache()
而不是 sqlContext.cacheTable(...)
,table 将不会使用内存中的列格式进行缓存,因此 sqlContext.cacheTable(...)
是强推荐用于此用例。
可以使用 SQLContext 上的 setConf 方法或使用 SQL.
的 运行 SET key=value 命令来完成内存缓存的配置
所以实际上你需要使用 sqlContext.cacheTable("the name of the table you gave for your table")
来缓存你的 data_sample RDD
因此,根据我与 eliasah 的讨论,我最终可以通过 :
以某种方式缓存 table
val dataset = hc.sql("SELECT * FROM autori_daily_import")
dataset.registerTempTable("data")
hc.cacheTable("data")
hc.sql("select count(*) from data")
res22: Array[org.apache.spark.sql.Row] = Array([6409331])
hc.sql("select sens,count(*) from data group by sens").collect().foreach(println)
[A,3672249]
[E,2737082]
而且缓存中确实有一个名为"RDD Storage Info for HiveTableScan ..."
的RDD
对我来说有点模糊的是,当我有一个 schemaRDD 并且我有一个 .cache() 方法时,为什么我需要注册一个临时的 table。如果我 运行 查询 schemaRDD(使用 .select('sens).countByValue() )然后 Spark 再次扫描 Hive table 并且不使用临时内存 table。
我正在尝试 运行 在 Hive table(通过 DataStax 4.6 在 CFS 上托管)上使用 Spark 进行一些基本数据探索。我的数据集大约是 3.1GO,我 运行 带有 dse spark --executor-memory 16g 的 spark-shell (是的,我的执行程序上确实有 16g 可用)。所以基本上我会写入 spark-shell,如下:
val dataset = hc.sql("SELECT * FROM my_hive_table") ;
val data_sample = dataset.sample(false,.01,0) ;
data_sample.cache
然后我会尝试计数以实际缓存一些东西
data_sample.count
但是当我检查 spark-shell web UI 时,我发现没有 RDD 持续存在,如果我再次尝试计数,我的整个数据集会再次从 CFS 读取。
所以我尝试通过 CFS 直接作为文本文件访问我的数据集
textFile.type = cfs:/user/hive/warehouse/my_hive_table/aaaammjj=20150526
并调整之前的代码以计算缓存 RDD 后的行数,这次 RDD 确实在两个工作人员之间使用 7 GB 进行了缓存!来自网络 UI :
cfs:/user/hive/warehouse/my_hive_table/aaaammjj=20150526 Memory Deserialized 1x Replicated
我的 schemaRDD 没有使用 Hive 进行缓存有什么原因吗?这将非常实用,因为 schemaRDD 提供了......好吧模式。
感谢任何帮助。
根据 Spark 1.2 的 official documentation,Spark SQL 可以通过调用 sqlContext.cacheTable("tableName").
然后 Spark SQL 将只扫描需要的列,并自动调整压缩以最小化内存使用和 GC 压力。您可以调用 sqlContext.uncacheTable("tableName") 从内存中删除 table。
请注意,如果您调用 schemaRDD.cache()
而不是 sqlContext.cacheTable(...)
,table 将不会使用内存中的列格式进行缓存,因此 sqlContext.cacheTable(...)
是强推荐用于此用例。
可以使用 SQLContext 上的 setConf 方法或使用 SQL.
的 运行 SET key=value 命令来完成内存缓存的配置所以实际上你需要使用 sqlContext.cacheTable("the name of the table you gave for your table")
因此,根据我与 eliasah 的讨论,我最终可以通过 :
以某种方式缓存 tableval dataset = hc.sql("SELECT * FROM autori_daily_import")
dataset.registerTempTable("data")
hc.cacheTable("data")
hc.sql("select count(*) from data")
res22: Array[org.apache.spark.sql.Row] = Array([6409331])
hc.sql("select sens,count(*) from data group by sens").collect().foreach(println)
[A,3672249]
[E,2737082]
而且缓存中确实有一个名为"RDD Storage Info for HiveTableScan ..."
的RDD对我来说有点模糊的是,当我有一个 schemaRDD 并且我有一个 .cache() 方法时,为什么我需要注册一个临时的 table。如果我 运行 查询 schemaRDD(使用 .select('sens).countByValue() )然后 Spark 再次扫描 Hive table 并且不使用临时内存 table。