DataFrame persist 不会提高 Spark 中的性能
DataFrame persist does not improve performance in Spark
我正在编写一个 Scala 脚本,该脚本从 table 中读取、转换数据并使用 Spark 显示结果。我正在使用 Spark 2.1.1.2 和 Scala 2.11.8。我在脚本中使用了两次数据框实例(下面代码中的df2
)。由于数据帧是在对其调用操作时计算的,而不是在声明它们时计算的,因此我预测该数据帧将被计算两次。我认为坚持这个数据框会提高性能,认为它会被计算一次(当坚持时),而不是两次,如果坚持的话。
然而,脚本 运行 在我坚持时比不坚持时多持续 10 秒。我不知道这是什么原因。如果有人有想法,将不胜感激。
我的提交命令行如下:
spark-submit --class TestQuery --master yarn --driver-memory 10G --executor-memory 10G --executor-cores 2 --num-executors 4 /home/bcp_data/test/target/TestQuery-1.0-SNAPSHOT.jar
Scala 脚本如下:
val spark = SparkSession
.builder()
.appName("TestQuery")
.config("spark.sql.warehouse.dir", "file:/tmp/hsperfdata_hdfs/spark-warehouse/")
.enableHiveSupport()
.getOrCreate()
val m = spark.sql("select id, startdate, enddate, status from members")
val l = spark.sql("select mid, no, status, potential from log")
val r = spark.sql("select mid, code from records")
val df1 = m.filter(($"status".isin(1,2).and($"startdate" <= one_year_ago)).and((($"enddate" >= one_year_ago)))
val df2 = df1.select($"id", $"code").join(l, "mid").filter(($"status".equalTo(1)).and($"potential".notEqual(9))).select($"no", $"id", $"code")
df2.persist
val df3 = df2.join(r, df2("id").equalTo(r("mid"))).filter($"code".isin("0001","0010","0015","0003","0012","0014","0032","0033")).groupBy($"code").agg(countDistinct($"no"))
val fa = spark.sql("select mid, acode from actions")
val fc = spark.sql("select dcode, fcode from params.codes")
val df5 = fa.join(fc, fa("acode").startsWith(fc("dcode")), "left_outer").select($"mid", $"fcode")
val df6 = df2.join(df5, df2("id").equalTo(df5("mid"))).groupBy($"code", $"fcode")
println("count1: " + df3.count + " count2: " + df6.count)
在这里使用缓存是正确的选择,但是你的声明
df2.persist
没有效果,因为您没有使用返回的数据帧。就这样
val df2 = df1.select($"id", $"code")
.join(l, "mid")
.filter(($"status".equalTo(1)).and($"potential".notEqual(9)))
.select($"no", $"id", $"code")
.persist
我正在编写一个 Scala 脚本,该脚本从 table 中读取、转换数据并使用 Spark 显示结果。我正在使用 Spark 2.1.1.2 和 Scala 2.11.8。我在脚本中使用了两次数据框实例(下面代码中的df2
)。由于数据帧是在对其调用操作时计算的,而不是在声明它们时计算的,因此我预测该数据帧将被计算两次。我认为坚持这个数据框会提高性能,认为它会被计算一次(当坚持时),而不是两次,如果坚持的话。
然而,脚本 运行 在我坚持时比不坚持时多持续 10 秒。我不知道这是什么原因。如果有人有想法,将不胜感激。
我的提交命令行如下:
spark-submit --class TestQuery --master yarn --driver-memory 10G --executor-memory 10G --executor-cores 2 --num-executors 4 /home/bcp_data/test/target/TestQuery-1.0-SNAPSHOT.jar
Scala 脚本如下:
val spark = SparkSession
.builder()
.appName("TestQuery")
.config("spark.sql.warehouse.dir", "file:/tmp/hsperfdata_hdfs/spark-warehouse/")
.enableHiveSupport()
.getOrCreate()
val m = spark.sql("select id, startdate, enddate, status from members")
val l = spark.sql("select mid, no, status, potential from log")
val r = spark.sql("select mid, code from records")
val df1 = m.filter(($"status".isin(1,2).and($"startdate" <= one_year_ago)).and((($"enddate" >= one_year_ago)))
val df2 = df1.select($"id", $"code").join(l, "mid").filter(($"status".equalTo(1)).and($"potential".notEqual(9))).select($"no", $"id", $"code")
df2.persist
val df3 = df2.join(r, df2("id").equalTo(r("mid"))).filter($"code".isin("0001","0010","0015","0003","0012","0014","0032","0033")).groupBy($"code").agg(countDistinct($"no"))
val fa = spark.sql("select mid, acode from actions")
val fc = spark.sql("select dcode, fcode from params.codes")
val df5 = fa.join(fc, fa("acode").startsWith(fc("dcode")), "left_outer").select($"mid", $"fcode")
val df6 = df2.join(df5, df2("id").equalTo(df5("mid"))).groupBy($"code", $"fcode")
println("count1: " + df3.count + " count2: " + df6.count)
在这里使用缓存是正确的选择,但是你的声明
df2.persist
没有效果,因为您没有使用返回的数据帧。就这样
val df2 = df1.select($"id", $"code")
.join(l, "mid")
.filter(($"status".equalTo(1)).and($"potential".notEqual(9)))
.select($"no", $"id", $"code")
.persist