Spark Dataframe 并行读取
Spark Dataframe parallel read
使用 pyspark 时,您可以在 sc.textFile
方法中设置 reduce 的数量,这样您就可以更快地从 S3 读取文件,如 here 所述。这很好用,但从 Spark 1.3 开始,我们也可以开始使用 DataFrames。
Spark DataFrames 也可以这样吗?我正在尝试将它们从 S3 加载到 spark 集群(这是通过 ec2-spark 创建的)。基本上,对于非常大的 'data.json' 文件,我试图让这段代码 运行 快速:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
df = sqlContext.jsonFile('s3n://bucket/data.json').cache()
实际上有一个与此相关的 TODO 注释 here and I created the corresponding issue here,因此如果您需要的话,您可以对其进行投票。
此致,
奥利维尔
在等待问题得到解决的过程中,我找到了一个暂时有效的解决方法。 .json
文件包含每一行的字典,所以我可以做的是首先将其作为 RDD 文本文件读入,然后通过手动指定列将其转换为数据框:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
data = sqlContext.textFile('s3n://bucket/data.json', 30).cache()
df_rdd = data\
.map(lambda x : dict(eval(x)))\
.map(lambda x : Row(x1=x['x1'], x2=x['x2'], x3=x['x3'], x4=x['x4']))
df = sqlContext.inferSchema(df_rdd).cache()
根据 docs。这也意味着您可以使用 .csv
文件而不是 json 文件(通常可以节省大量磁盘 space),只要您在 spark.conf 中手动指定列名即可。
使用 pyspark 时,您可以在 sc.textFile
方法中设置 reduce 的数量,这样您就可以更快地从 S3 读取文件,如 here 所述。这很好用,但从 Spark 1.3 开始,我们也可以开始使用 DataFrames。
Spark DataFrames 也可以这样吗?我正在尝试将它们从 S3 加载到 spark 集群(这是通过 ec2-spark 创建的)。基本上,对于非常大的 'data.json' 文件,我试图让这段代码 运行 快速:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
df = sqlContext.jsonFile('s3n://bucket/data.json').cache()
实际上有一个与此相关的 TODO 注释 here and I created the corresponding issue here,因此如果您需要的话,您可以对其进行投票。
此致,
奥利维尔
在等待问题得到解决的过程中,我找到了一个暂时有效的解决方法。 .json
文件包含每一行的字典,所以我可以做的是首先将其作为 RDD 文本文件读入,然后通过手动指定列将其转换为数据框:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
data = sqlContext.textFile('s3n://bucket/data.json', 30).cache()
df_rdd = data\
.map(lambda x : dict(eval(x)))\
.map(lambda x : Row(x1=x['x1'], x2=x['x2'], x3=x['x3'], x4=x['x4']))
df = sqlContext.inferSchema(df_rdd).cache()
根据 docs。这也意味着您可以使用 .csv
文件而不是 json 文件(通常可以节省大量磁盘 space),只要您在 spark.conf 中手动指定列名即可。