在 Pyspark Databricks 中处理 1000 个 JSON 文件
working with 1000's of JSON files in Pyspark Databricks
我有大约 2.5 k JSON 个文件,每个 JSON 个文件代表 1 行。对于这些文件,我需要做一些非常简单的 ETL 并将它们移动到我的数据湖的 curated
部分。
我遍历我的数据湖并通过一个简单的 .read
调用调用我的 JSON 文件我事先定义了我的 JSON 模式。
然后我做我的 ETL 并尝试将这些文件写入我的数据湖的一个单独部分,但是写入部分 非常 慢,写一个文件花了 15 分钟只有几百kb的文件?
rp = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
for iter in iterable:
#do stuff
# filter my sparkDF with .filter
SparkDF_F = sparkDF.filter(...)
sparkDF_F.write('path/filename.parquet')
我尝试使用 'OPTIMIZE' 并在我的路径中调用它
%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'
抛出以下错误。
Error in SQL statement: ParseException:
mismatched input 'dbfs:/mnt/raw/data/table' expecting {'SELECT', 'FROM', '
ADD', 'AS', 'TIMESTAMP', 'VERSION', 'ALL', 'ANY', 'DISTINCT',
'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER....
有人能指导我解决我在这里的误解吗?
设置
- Azure Databricks
- 6.0
- Spark 2.4
- Python 3.6
- 42GB 集群,12 核。
- 4 个节点
- Azure Gen1 DataLake。
两件事:
如果2.5k JSON 个文件存放在同一个文件夹中。您可以使用相同的文件夹路径直接阅读它们:
rp = spark.read.json(path_common, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
然后,您可以在整个数据框中应用 rp.filter,因为它只有一个(不需要对每个文件进行迭代)
- 根据Delta的文档,只能优化一个table(存储在dbfs中),不能直接优化一个DBFS文件。因此,您可以使用 dbfs 中指向的目录创建 table,并按照文档中的建议使用优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html
希望对您有所帮助
我有大约 2.5 k JSON 个文件,每个 JSON 个文件代表 1 行。对于这些文件,我需要做一些非常简单的 ETL 并将它们移动到我的数据湖的 curated
部分。
我遍历我的数据湖并通过一个简单的 .read
调用调用我的 JSON 文件我事先定义了我的 JSON 模式。
然后我做我的 ETL 并尝试将这些文件写入我的数据湖的一个单独部分,但是写入部分 非常 慢,写一个文件花了 15 分钟只有几百kb的文件?
rp = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
for iter in iterable:
#do stuff
# filter my sparkDF with .filter
SparkDF_F = sparkDF.filter(...)
sparkDF_F.write('path/filename.parquet')
我尝试使用 'OPTIMIZE' 并在我的路径中调用它
%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'
抛出以下错误。
Error in SQL statement: ParseException:
mismatched input 'dbfs:/mnt/raw/data/table' expecting {'SELECT', 'FROM', '
ADD', 'AS', 'TIMESTAMP', 'VERSION', 'ALL', 'ANY', 'DISTINCT',
'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER....
有人能指导我解决我在这里的误解吗?
设置
- Azure Databricks
- 6.0
- Spark 2.4
- Python 3.6
- 42GB 集群,12 核。
- 4 个节点
- Azure Gen1 DataLake。
两件事:
如果2.5k JSON 个文件存放在同一个文件夹中。您可以使用相同的文件夹路径直接阅读它们:
rp = spark.read.json(path_common, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
然后,您可以在整个数据框中应用 rp.filter,因为它只有一个(不需要对每个文件进行迭代)
- 根据Delta的文档,只能优化一个table(存储在dbfs中),不能直接优化一个DBFS文件。因此,您可以使用 dbfs 中指向的目录创建 table,并按照文档中的建议使用优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html
希望对您有所帮助