有没有办法使用 Databricks 将多个文本文件加载到单个数据框中?

Is there a way to load multiple text files into a single dataframe using Databricks?

我正在尝试测试一些想法,以递归循环遍历文件夹和 sub-folders 中的所有文件,并将所有内容加载到单个数据框中。我有 12 种不同的文件,不同之处在于文件命名约定。因此,我有以 'ABC' 开头的文件名、以 'CN' 开头的文件名、以 'CZ' 开头的文件名,等等。我尝试了以下 3 个想法。

import pyspark  
import os.path
from pyspark.sql import SQLContext
from pyspark.sql.functions import input_file_name

df = sqlContext.read.format("com.databricks.spark.text").option("header", "false").load("dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz")
df.withColumn('input', input_file_name())
print(dfCW)

df = sc.textFile('/mnt/rawdata/2019/06/28/Parent/ABC*.gz')
print(df)

df = sc.sequenceFile('dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz/').toDF()
df.withColumn('input', input_file_name())
print(dfCW)

这可以使用 PySpark 或 PySpark SQL 来完成。我只需要将所有内容从数据湖加载到数据框中,这样我就可以将数据框推送到 Azure SQL 服务器中。我在 Azure Databricks 中进行所有编码。如果这是常规 Python,我可以很容易地做到这一点。我只是不太了解 PySpark,无法让它正常工作。

为了说明这一点,我有 3 个压缩文件,如下所示(ABC0006.gz、ABC00015.gz 和 ABC0022.gz):

ABC0006.gz
0x0000fa00|ABC|T3|1995
0x00102c55|ABC|K2|2017
0x00024600|ABC|V0|1993

ABC00015.gz
0x00102c54|ABC|G1|2016
0x00102cac|ABC|S4|2017
0x00038600|ABC|F6|2003

ABC0022.gz
0x00102c57|ABC|J0|2017
0x0000fa00|ABC|J6|1994
0x00102cec|ABC|V2|2017

我想将所有内容合并到一个 datdframe 中,如下所示(.gz 是文件名;每个文件都完全相同 headers):

0x0000fa00|ABC|T3|1995
0x00102c55|ABC|K2|2017
0x00024600|ABC|V0|1993
0x00102c54|ABC|G1|2016
0x00102cac|ABC|S4|2017
0x00038600|ABC|F6|2003
0x00102c57|ABC|J0|2017
0x0000fa00|ABC|J6|1994
0x00102cec|ABC|V2|2017

我有 1000 个这样的文件需要处理。幸运的是,只有 12 种不同类型的文件,因此有 12 种类型的名称...以 'ABC'、'CN'、'CZ' 等开头。感谢您查看此处。

根据你的评论,亚伯拉罕,我的代码看起来应该是这样的,对吧...

file_list=[]
path = 'dbfs/rawdata/2019/06/28/Parent/'
files  = dbutils.fs.ls(path)
for file in files:
    if(file.name.startswith('ABC')):
       file_list.append(file.name)
df = spark.read.load(path=file_list)

这是正确的还是不正确的?请指教。我认为我们很接近,但这对我来说仍然不起作用,否则我不会 re-posting 在这里。谢谢!!

PySpark 支持使用加载函数加载文件列表。我相信这就是您要找的

file_list=[]
path = 'dbfs/mnt/rawdata/2019/06/28/Parent/'
files  = dbutils.fs.ls(path)
for file in files:
    if(file.name.startswith('ABC')):
       file_list.append(file.name)
df = spark.read.load(path=file_list)

如果文件是 CSV 文件并且 header 使用下面的命令

df = spark.read.load(path=file_list,format="csv", sep=",", inferSchema="true", header="true")

更多示例代码请参考https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

我终于,终于,终于开始工作了。

val myDFCsv = spark.read.format("csv")
   .option("sep","|")
   .option("inferSchema","true")
   .option("header","false")
   .load("mnt/rawdata/2019/01/01/client/ABC*.gz")

myDFCsv.show()
myDFCsv.count()

显然所有压缩文件和推断模式任务都是自动处理的。因此,代码超级、超级轻量级​​,而且非常快。