如何在 PySpark 中读取大型 JSON 数组文件
How do I read a Large JSON Array File in PySpark
问题
我最近在尝试读取大型 UTF-8 JSON 数组文件并切换到 HDInsight PySpark(v2.x,而非 3)进行处理时遇到了 Azure Data Lake Analytics 中的挑战文件。该文件是 ~110G,有 ~150m JSON 个对象。
HDInsight PySpark 似乎不支持输入 JSON 文件格式数组,所以我被卡住了。另外,我有 "many" 这样的文件,每个文件都包含数百列,每个文件都有不同的架构,因此目前不能为这些创建架构。
问题
如何在 HDInsight 上使用 PySpark 2 中开箱即用的功能来使这些文件能够被读取为 JSON?
谢谢,
J
我尝试过的东西
我使用了本页底部的方法:
from Databricks 提供了以下代码片段:
import json
df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF()
display(df)
我尝试了上面的方法,不理解 "wholeTextFiles" 是如何工作的,当然 运行 进入了 OutOfMemory 错误,这些错误很快杀死了我的执行者。
我尝试加载到 RDD 和其他开放方法,但 PySpark 似乎只支持 JSON 行 JSON 文件格式,我有 JSON 对象数组ADLA 对该文件格式的要求。
我尝试以文本文件的形式读入,剥离数组字符,在 JSON 对象边界上拆分并像上面那样转换为 JSON,但是一直出现无法转换的错误unicode and/or str (ings).
我找到了解决上述问题的方法,并转换为包含一列的数据框,其中包含作为 JSON 对象的字符串行。但是,我没有找到一种方法将数据框行中的 JSON 字符串单独输出到输出文件。总是出现
{'dfColumnName':'{...json_string_as_value}'}
我还尝试了接受上述行的映射函数,解析为 JSON,提取值(JSON 我想要),然后将值解析为 JSON。这似乎有效,但是当我尝试保存时,RDD 是 PipelineRDD 类型并且没有 saveAsTextFile() 方法。然后我尝试了 toJSON 方法,但不断收到关于 "found no valid JSON Object" 的错误,我承认我不明白,当然还有其他转换错误。
我终于找到了前进的方向。我了解到我可以直接从 RDD 读取 json,包括 PipelineRDD。我找到了一种方法来删除 unicode 字节顺序 header,包装数组方括号,根据幸运分隔符拆分 JSON Objects,并拥有分布式数据集以进行更高效的处理。输出数据框现在有以 JSON 元素命名的列,推断模式,并动态适应其他文件格式。
这是代码 - 希望对您有所帮助!:
#...Spark considers arrays of Json objects to be an invalid format
# and unicode files are prefixed with a byteorder marker
#
thanksMoiraRDD = sc.textFile( '/a/valid/file/path', partitions ).map(
lambda x: x.encode('utf-8','ignore').strip(u",\r\n[]\ufeff")
)
df = sqlContext.read.json(thanksMoiraRDD)
问题
我最近在尝试读取大型 UTF-8 JSON 数组文件并切换到 HDInsight PySpark(v2.x,而非 3)进行处理时遇到了 Azure Data Lake Analytics 中的挑战文件。该文件是 ~110G,有 ~150m JSON 个对象。
HDInsight PySpark 似乎不支持输入 JSON 文件格式数组,所以我被卡住了。另外,我有 "many" 这样的文件,每个文件都包含数百列,每个文件都有不同的架构,因此目前不能为这些创建架构。
问题
如何在 HDInsight 上使用 PySpark 2 中开箱即用的功能来使这些文件能够被读取为 JSON?
谢谢,
J
我尝试过的东西
我使用了本页底部的方法: from Databricks 提供了以下代码片段:
import json
df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF()
display(df)
我尝试了上面的方法,不理解 "wholeTextFiles" 是如何工作的,当然 运行 进入了 OutOfMemory 错误,这些错误很快杀死了我的执行者。
我尝试加载到 RDD 和其他开放方法,但 PySpark 似乎只支持 JSON 行 JSON 文件格式,我有 JSON 对象数组ADLA 对该文件格式的要求。
我尝试以文本文件的形式读入,剥离数组字符,在 JSON 对象边界上拆分并像上面那样转换为 JSON,但是一直出现无法转换的错误unicode and/or str (ings).
我找到了解决上述问题的方法,并转换为包含一列的数据框,其中包含作为 JSON 对象的字符串行。但是,我没有找到一种方法将数据框行中的 JSON 字符串单独输出到输出文件。总是出现
{'dfColumnName':'{...json_string_as_value}'}
我还尝试了接受上述行的映射函数,解析为 JSON,提取值(JSON 我想要),然后将值解析为 JSON。这似乎有效,但是当我尝试保存时,RDD 是 PipelineRDD 类型并且没有 saveAsTextFile() 方法。然后我尝试了 toJSON 方法,但不断收到关于 "found no valid JSON Object" 的错误,我承认我不明白,当然还有其他转换错误。
我终于找到了前进的方向。我了解到我可以直接从 RDD 读取 json,包括 PipelineRDD。我找到了一种方法来删除 unicode 字节顺序 header,包装数组方括号,根据幸运分隔符拆分 JSON Objects,并拥有分布式数据集以进行更高效的处理。输出数据框现在有以 JSON 元素命名的列,推断模式,并动态适应其他文件格式。
这是代码 - 希望对您有所帮助!:
#...Spark considers arrays of Json objects to be an invalid format
# and unicode files are prefixed with a byteorder marker
#
thanksMoiraRDD = sc.textFile( '/a/valid/file/path', partitions ).map(
lambda x: x.encode('utf-8','ignore').strip(u",\r\n[]\ufeff")
)
df = sqlContext.read.json(thanksMoiraRDD)