如何在 PySpark 中读取大型 JSON 数组文件

How do I read a Large JSON Array File in PySpark

问题

我最近在尝试读取大型 UTF-8 JSON 数组文件并切换到 HDInsight PySpark(v2.x,而非 3)进行处理时遇到了 Azure Data Lake Analytics 中的挑战文件。该文件是 ~110G,有 ~150m JSON 个对象。

HDInsight PySpark 似乎不支持输入 JSON 文件格式数组,所以我被卡住了。另外,我有 "many" 这样的文件,每个文件都包含数百列,每个文件都有不同的架构,因此目前不能为这些创建架构。

问题

如何在 HDInsight 上使用 PySpark 2 中开箱即用的功能来使这些文件能够被读取为 JSON?

谢谢,

J

我尝试过的东西

我使用了本页底部的方法: from Databricks 提供了以下代码片段:

import json

df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF()
display(df)

我尝试了上面的方法,不理解 "wholeTextFiles" 是如何工作的,当然 运行 进入了 OutOfMemory 错误,这些错误很快杀死了我的执行者。

我尝试加载到 RDD 和其他开放方法,但 PySpark 似乎只支持 JSON 行 JSON 文件格式,我有 JSON 对象数组ADLA 对该文件格式的要求。

我尝试以文本文件的形式读入,剥离数组字符,在 JSON 对象边界上拆分并像上面那样转换为 JSON,但是一直出现无法转换的错误unicode and/or str (ings).

我找到了解决上述问题的方法,并转换为包含一列的数据框,其中包含作为 JSON 对象的字符串行。但是,我没有找到一种方法将数据框行中的 JSON 字符串单独输出到输出文件。总是出现

{'dfColumnName':'{...json_string_as_value}'}

我还尝试了接受上述行的映射函数,解析为 JSON,提取值(JSON 我想要),然后将值解析为 JSON。这似乎有效,但是当我尝试保存时,RDD 是 PipelineRDD 类型并且没有 saveAsTextFile() 方法。然后我尝试了 toJSON 方法,但不断收到关于 "found no valid JSON Object" 的错误,我承认我不明白,当然还有其他转换错误。

我终于找到了前进的方向。我了解到我可以直接从 RDD 读取 json,包括 PipelineRDD。我找到了一种方法来删除 unicode 字节顺序 header,包装数组方括号,根据幸运分隔符拆分 JSON Objects,并拥有分布式数据集以进行更高效的处理。输出数据框现在有以 JSON 元素命名的列,推断模式,并动态适应其他文件格式。

这是代码 - 希望对您有所帮助!:

#...Spark considers arrays of Json objects to be an invalid format
#    and unicode files are prefixed with a byteorder marker
#
thanksMoiraRDD = sc.textFile( '/a/valid/file/path', partitions ).map(
    lambda x: x.encode('utf-8','ignore').strip(u",\r\n[]\ufeff") 
)

df = sqlContext.read.json(thanksMoiraRDD)