Spark过滤是否重新加载数据?
Does Spark filtering reload the data?
这是我真正简单的 Spark 作业的主体...
def hBaseRDD = sc.newAPIHadoopRDD(config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class)
println "${hBaseRDD.count()} records counted"
def filteredRDD = hBaseRDD.filter({ scala.Tuple2 result ->
def val = result._2.getValue(family, qualifier)
val ? new String(val) == 'twitter' : false
} as Function<Result, Boolean>)
println "${filteredRDD.count()} counted from twitter."
println "Done!"
我在 spark-submit 输出中注意到,它似乎两次转到 HBase。第一次是在 hBaseRDD
上调用 count,第二次是在 filteredRDD
上调用 filter 时。有没有办法让它在 hBaseRDD 中缓存 newAPIHadoopRDD
调用的结果,以便过滤器在仅内存中的数据副本上工作?
hbaseRDD.cache()
在计数之前就可以了。
文档详细介绍了选项:http://spark.apache.org/docs/1.2.0/programming-guide.html#rdd-persistence
这是我真正简单的 Spark 作业的主体...
def hBaseRDD = sc.newAPIHadoopRDD(config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class)
println "${hBaseRDD.count()} records counted"
def filteredRDD = hBaseRDD.filter({ scala.Tuple2 result ->
def val = result._2.getValue(family, qualifier)
val ? new String(val) == 'twitter' : false
} as Function<Result, Boolean>)
println "${filteredRDD.count()} counted from twitter."
println "Done!"
我在 spark-submit 输出中注意到,它似乎两次转到 HBase。第一次是在 hBaseRDD
上调用 count,第二次是在 filteredRDD
上调用 filter 时。有没有办法让它在 hBaseRDD 中缓存 newAPIHadoopRDD
调用的结果,以便过滤器在仅内存中的数据副本上工作?
hbaseRDD.cache()
在计数之前就可以了。
文档详细介绍了选项:http://spark.apache.org/docs/1.2.0/programming-guide.html#rdd-persistence