如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?
How to iterate in Databricks to read hundreds of files stored in different subdirectories in a Data Lake?
我必须从 Azure Data Lake Gen2 中读取 Databricks 中的数百个 avro 文件,从每个文件中的 Body 字段中提取数据,并将所有提取的数据连接到一个唯一的数据框中。关键是所有要读取的 avro 文件都 存储在湖中的不同子目录 中,遵循以下模式:
root/YYYY/MM/DD/HH/mm/ss.avro
这迫使我循环摄取和选择数据。我正在使用此 Python 代码,其中 list_avro_files 是所有文件的路径列表:
list_data = []
for file_avro in list_avro_files:
df = spark.read.format('avro').load(file_avro)
data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
list_data.append(data1)
data = reduce(DataFrame.unionAll, list_data)
有什么方法可以更有效地做到这一点?我怎样才能 parallelize/speed 启动这个过程?
只要你的list_avro_files
可以通过标准的通配符语法来表达,你大概可以利用Spark自带的并行读操作的能力。您只需要为您的 avro 文件指定 basepath
和文件名模式:
scala> var df = spark.read
.option("basepath","/user/hive/warehouse/root")
.format("avro")
.load("/user/hive/warehouse/root/*/*/*/*.avro")
并且,如果您发现需要确切知道任何给定行来自哪个文件,请使用 input_file_name()
内置函数来丰富您的数据框:
scala> df = df.withColumn("source",input_file_name())
我必须从 Azure Data Lake Gen2 中读取 Databricks 中的数百个 avro 文件,从每个文件中的 Body 字段中提取数据,并将所有提取的数据连接到一个唯一的数据框中。关键是所有要读取的 avro 文件都 存储在湖中的不同子目录 中,遵循以下模式:
root/YYYY/MM/DD/HH/mm/ss.avro
这迫使我循环摄取和选择数据。我正在使用此 Python 代码,其中 list_avro_files 是所有文件的路径列表:
list_data = []
for file_avro in list_avro_files:
df = spark.read.format('avro').load(file_avro)
data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
list_data.append(data1)
data = reduce(DataFrame.unionAll, list_data)
有什么方法可以更有效地做到这一点?我怎样才能 parallelize/speed 启动这个过程?
只要你的list_avro_files
可以通过标准的通配符语法来表达,你大概可以利用Spark自带的并行读操作的能力。您只需要为您的 avro 文件指定 basepath
和文件名模式:
scala> var df = spark.read
.option("basepath","/user/hive/warehouse/root")
.format("avro")
.load("/user/hive/warehouse/root/*/*/*/*.avro")
并且,如果您发现需要确切知道任何给定行来自哪个文件,请使用 input_file_name()
内置函数来丰富您的数据框:
scala> df = df.withColumn("source",input_file_name())