Spark Dataframe 并行读取

Spark Dataframe parallel read

使用 pyspark 时,您可以在 sc.textFile 方法中设置 reduce 的数量,这样您就可以更快地从 S3 读取文件,如 here 所述。这很好用,但从 Spark 1.3 开始,我们也可以开始使用 DataFrames。

Spark DataFrames 也可以这样吗?我正在尝试将它们从 S3 加载到 spark 集群(这是通过 ec2-spark 创建的)。基本上,对于非常大的 'data.json' 文件,我试图让这段代码 运行 快速:

from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
df = sqlContext.jsonFile('s3n://bucket/data.json').cache()

实际上有一个与此相关的 TODO 注释 here and I created the corresponding issue here,因此如果您需要的话,您可以对其进行投票。

此致,

奥利维尔

在等待问题得到解决的过程中,我找到了一个暂时有效的解决方法。 .json 文件包含每一行的字典,所以我可以做的是首先将其作为 RDD 文本文件读入,然后通过手动指定列将其转换为数据框:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
data = sqlContext.textFile('s3n://bucket/data.json', 30).cache()
df_rdd = data\
    .map(lambda x : dict(eval(x)))\
    .map(lambda x : Row(x1=x['x1'], x2=x['x2'], x3=x['x3'], x4=x['x4']))
df = sqlContext.inferSchema(df_rdd).cache()

根据 docs。这也意味着您可以使用 .csv 文件而不是 json 文件(通常可以节省大量磁盘 space),只要您在 spark.conf 中手动指定列名即可。