如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?

How to iterate in Databricks to read hundreds of files stored in different subdirectories in a Data Lake?

我必须从 Azure Data Lake Gen2 中读取 Databricks 中的数百个 avro 文件,从每个文件中的 Body 字段中提取数据,并将所有提取的数据连接到一个唯一的数据框中。关键是所有要读取的 avro 文件都 存储在湖中的不同子目录 中,遵循以下模式:

root/YYYY/MM/DD/HH/mm/ss.avro

这迫使我循环摄取和选择数据。我正在使用此 Python 代码,其中 list_avro_files 是所有文件的路径列表:

list_data = []

for file_avro in list_avro_files:
  df = spark.read.format('avro').load(file_avro)
  data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
  list_data.append(data1)

data = reduce(DataFrame.unionAll, list_data)

有什么方法可以更有效地做到这一点?我怎样才能 parallelize/speed 启动这个过程?

只要你的list_avro_files可以通过标准的通配符语法来表达,你大概可以利用Spark自带的并行读操作的能力。您只需要为您的 avro 文件指定 basepath 和文件名模式:

scala> var df = spark.read
                 .option("basepath","/user/hive/warehouse/root")
                 .format("avro")
                 .load("/user/hive/warehouse/root/*/*/*/*.avro")

并且,如果您发现需要确切知道任何给定行来自哪个文件,请使用 input_file_name() 内置函数来丰富您的数据框:

scala> df = df.withColumn("source",input_file_name())