运行 PySpark-SQL 在 hadoop 集群上执行时间长?

Long execution time when running PySpark-SQL on hadoop cluster?

我有一组天气数据,我正在尝试查询它以获得每年的平均低点和平均高点。我可以毫无问题地提交作业并获得所需的结果,但 运行 需要几个小时。我认为它会 运行 快得多,我是做错了什么还是没有我想象的那么快?

数据是一个包含超过 100,000,000 个条目的 csv 文件。 列是日期、气象站、测量值(TMAX 或 TMIN)和值

我运行正在我大学的 hadoop 集群上工作,我没有比集群更多的信息。

提前致谢!

import sys
from random import random
from operator import add
from pyspark.sql import SQLContext, Row
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext(appName="PythonPi")
    sqlContext = SQLContext(sc)
    file = sys.argv[1]
    lines = sc.textFile(file)
    parts = lines.map(lambda l: l.split(","))
    obs = parts.map(lambda p: Row(station=p[0], date=int(p[1]) , measurement=p[2] , value=p[3]  )  ) 
    weather = sqlContext.createDataFrame(obs)
    weather.registerTempTable("weather")


    #AVERAGE TMAX/TMIN PER YEAR
    query2 = sqlContext.sql("""select SUBSTRING(date,1,4) as Year, avg(value)as Average, measurement
                                from weather
                                where value<130 AND value>-40 
                                group by measurement, SUBSTRING(date,1,4) 
                                order by SUBSTRING(date,1,4) """)

    query2.show()
    query2.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("hdfs:/user/adduccij/tmax_tmin_year.csv")
    sc.stop()


确保 spark 作业实际上是在集群(而不是本地)模式下启动的。例如如果您使用的是纱线,那么作业将以 'yarn-client' 模式启动。

如果是这样,请确保您提供了足够的#executors/cores/ 执行程序和驱动程序内存。您可以从资源管理器(例如 yarn)页面或 spark 上下文(sqlContext.getAllConfs)获取实际的 cluster/job 信息。

10000万条记录并不少。假设每条记录是 30 字节,总大小仍然是 3gb,如果你只有少数执行者,这可能需要一段时间。

假设上述建议没有帮助,则尝试找出查询的哪一部分花费了很长时间。一些加速技巧是:

  • 缓存天气数据帧

  • 将查询分为两部分:第一部分进行分组,并缓存输出

  • 第二部分按

  • 排序
  • 而不是合并,使用默认分片编写 rdd,然后执行 mergeFrom 以从 shell.

  • 获取 csv 输出