有没有办法使用 Databricks 将多个文本文件加载到单个数据框中?
Is there a way to load multiple text files into a single dataframe using Databricks?
我正在尝试测试一些想法,以递归循环遍历文件夹和 sub-folders 中的所有文件,并将所有内容加载到单个数据框中。我有 12 种不同的文件,不同之处在于文件命名约定。因此,我有以 'ABC' 开头的文件名、以 'CN' 开头的文件名、以 'CZ' 开头的文件名,等等。我尝试了以下 3 个想法。
import pyspark
import os.path
from pyspark.sql import SQLContext
from pyspark.sql.functions import input_file_name
df = sqlContext.read.format("com.databricks.spark.text").option("header", "false").load("dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz")
df.withColumn('input', input_file_name())
print(dfCW)
或
df = sc.textFile('/mnt/rawdata/2019/06/28/Parent/ABC*.gz')
print(df)
或
df = sc.sequenceFile('dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz/').toDF()
df.withColumn('input', input_file_name())
print(dfCW)
这可以使用 PySpark 或 PySpark SQL 来完成。我只需要将所有内容从数据湖加载到数据框中,这样我就可以将数据框推送到 Azure SQL 服务器中。我在 Azure Databricks 中进行所有编码。如果这是常规 Python,我可以很容易地做到这一点。我只是不太了解 PySpark,无法让它正常工作。
为了说明这一点,我有 3 个压缩文件,如下所示(ABC0006.gz、ABC00015.gz 和 ABC0022.gz):
ABC0006.gz
0x0000fa00|ABC|T3|1995
0x00102c55|ABC|K2|2017
0x00024600|ABC|V0|1993
ABC00015.gz
0x00102c54|ABC|G1|2016
0x00102cac|ABC|S4|2017
0x00038600|ABC|F6|2003
ABC0022.gz
0x00102c57|ABC|J0|2017
0x0000fa00|ABC|J6|1994
0x00102cec|ABC|V2|2017
我想将所有内容合并到一个 datdframe 中,如下所示(.gz 是文件名;每个文件都完全相同 headers):
0x0000fa00|ABC|T3|1995
0x00102c55|ABC|K2|2017
0x00024600|ABC|V0|1993
0x00102c54|ABC|G1|2016
0x00102cac|ABC|S4|2017
0x00038600|ABC|F6|2003
0x00102c57|ABC|J0|2017
0x0000fa00|ABC|J6|1994
0x00102cec|ABC|V2|2017
我有 1000 个这样的文件需要处理。幸运的是,只有 12 种不同类型的文件,因此有 12 种类型的名称...以 'ABC'、'CN'、'CZ' 等开头。感谢您查看此处。
根据你的评论,亚伯拉罕,我的代码看起来应该是这样的,对吧...
file_list=[]
path = 'dbfs/rawdata/2019/06/28/Parent/'
files = dbutils.fs.ls(path)
for file in files:
if(file.name.startswith('ABC')):
file_list.append(file.name)
df = spark.read.load(path=file_list)
这是正确的还是不正确的?请指教。我认为我们很接近,但这对我来说仍然不起作用,否则我不会 re-posting 在这里。谢谢!!
PySpark 支持使用加载函数加载文件列表。我相信这就是您要找的
file_list=[]
path = 'dbfs/mnt/rawdata/2019/06/28/Parent/'
files = dbutils.fs.ls(path)
for file in files:
if(file.name.startswith('ABC')):
file_list.append(file.name)
df = spark.read.load(path=file_list)
如果文件是 CSV 文件并且 header 使用下面的命令
df = spark.read.load(path=file_list,format="csv", sep=",", inferSchema="true", header="true")
更多示例代码请参考https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
我终于,终于,终于开始工作了。
val myDFCsv = spark.read.format("csv")
.option("sep","|")
.option("inferSchema","true")
.option("header","false")
.load("mnt/rawdata/2019/01/01/client/ABC*.gz")
myDFCsv.show()
myDFCsv.count()
显然所有压缩文件和推断模式任务都是自动处理的。因此,代码超级、超级轻量级,而且非常快。
我正在尝试测试一些想法,以递归循环遍历文件夹和 sub-folders 中的所有文件,并将所有内容加载到单个数据框中。我有 12 种不同的文件,不同之处在于文件命名约定。因此,我有以 'ABC' 开头的文件名、以 'CN' 开头的文件名、以 'CZ' 开头的文件名,等等。我尝试了以下 3 个想法。
import pyspark
import os.path
from pyspark.sql import SQLContext
from pyspark.sql.functions import input_file_name
df = sqlContext.read.format("com.databricks.spark.text").option("header", "false").load("dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz")
df.withColumn('input', input_file_name())
print(dfCW)
或
df = sc.textFile('/mnt/rawdata/2019/06/28/Parent/ABC*.gz')
print(df)
或
df = sc.sequenceFile('dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz/').toDF()
df.withColumn('input', input_file_name())
print(dfCW)
这可以使用 PySpark 或 PySpark SQL 来完成。我只需要将所有内容从数据湖加载到数据框中,这样我就可以将数据框推送到 Azure SQL 服务器中。我在 Azure Databricks 中进行所有编码。如果这是常规 Python,我可以很容易地做到这一点。我只是不太了解 PySpark,无法让它正常工作。
为了说明这一点,我有 3 个压缩文件,如下所示(ABC0006.gz、ABC00015.gz 和 ABC0022.gz):
ABC0006.gz
0x0000fa00|ABC|T3|1995
0x00102c55|ABC|K2|2017
0x00024600|ABC|V0|1993
ABC00015.gz
0x00102c54|ABC|G1|2016
0x00102cac|ABC|S4|2017
0x00038600|ABC|F6|2003
ABC0022.gz
0x00102c57|ABC|J0|2017
0x0000fa00|ABC|J6|1994
0x00102cec|ABC|V2|2017
我想将所有内容合并到一个 datdframe 中,如下所示(.gz 是文件名;每个文件都完全相同 headers):
0x0000fa00|ABC|T3|1995
0x00102c55|ABC|K2|2017
0x00024600|ABC|V0|1993
0x00102c54|ABC|G1|2016
0x00102cac|ABC|S4|2017
0x00038600|ABC|F6|2003
0x00102c57|ABC|J0|2017
0x0000fa00|ABC|J6|1994
0x00102cec|ABC|V2|2017
我有 1000 个这样的文件需要处理。幸运的是,只有 12 种不同类型的文件,因此有 12 种类型的名称...以 'ABC'、'CN'、'CZ' 等开头。感谢您查看此处。
根据你的评论,亚伯拉罕,我的代码看起来应该是这样的,对吧...
file_list=[]
path = 'dbfs/rawdata/2019/06/28/Parent/'
files = dbutils.fs.ls(path)
for file in files:
if(file.name.startswith('ABC')):
file_list.append(file.name)
df = spark.read.load(path=file_list)
这是正确的还是不正确的?请指教。我认为我们很接近,但这对我来说仍然不起作用,否则我不会 re-posting 在这里。谢谢!!
PySpark 支持使用加载函数加载文件列表。我相信这就是您要找的
file_list=[]
path = 'dbfs/mnt/rawdata/2019/06/28/Parent/'
files = dbutils.fs.ls(path)
for file in files:
if(file.name.startswith('ABC')):
file_list.append(file.name)
df = spark.read.load(path=file_list)
如果文件是 CSV 文件并且 header 使用下面的命令
df = spark.read.load(path=file_list,format="csv", sep=",", inferSchema="true", header="true")
更多示例代码请参考https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
我终于,终于,终于开始工作了。
val myDFCsv = spark.read.format("csv")
.option("sep","|")
.option("inferSchema","true")
.option("header","false")
.load("mnt/rawdata/2019/01/01/client/ABC*.gz")
myDFCsv.show()
myDFCsv.count()
显然所有压缩文件和推断模式任务都是自动处理的。因此,代码超级、超级轻量级,而且非常快。