在 Pyspark Databricks 中处理 1000 个 JSON 文件

working with 1000's of JSON files in Pyspark Databricks

我有大约 2.5 k JSON 个文件,每个 JSON 个文件代表 1 行。对于这些文件,我需要做一些非常简单的 ETL 并将它们移动到我的数据湖的 curated 部分。

我遍历我的数据湖并通过一个简单的 .read 调用调用我的 JSON 文件我事先定义了我的 JSON 模式。

然后我做我的 ETL 并尝试将这些文件写入我的数据湖的一个单独部分,但是写入部分 非常 慢,写一个文件花了 15 分钟只有几百kb的文件?

rp  = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())

for iter in iterable:
    #do stuff
    # filter my sparkDF with .filter
    SparkDF_F = sparkDF.filter(...)
    sparkDF_F.write('path/filename.parquet')

我尝试使用 'OPTIMIZE' 并在我的路径中调用它

%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'

抛出以下错误。

Error in SQL statement: ParseException: 
mismatched input 'dbfs:/mnt/raw/data/table' expecting {'SELECT', 'FROM', '
ADD', 'AS', 'TIMESTAMP', 'VERSION', 'ALL', 'ANY', 'DISTINCT', 
'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER.... 

有人能指导我解决我在这里的误解吗?

设置

两件事:

  1. 如果2.5k JSON 个文件存放在同一个文件夹中。您可以使用相同的文件夹路径直接阅读它们:

    rp = spark.read.json(path_common, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())

然后,您可以在整个数据框中应用 rp.filter,因为它只有一个(不需要对每个文件进行迭代)

  1. 根据Delta的文档,只能优化一个table(存储在dbfs中),不能直接优化一个DBFS文件。因此,您可以使用 dbfs 中指向的目录创建 table,并按照文档中的建议使用优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

希望对您有所帮助