如何根据文件模式将不同的文件加载到不同的表中?
How to load different files into different tables, based on file pattern?
我是 运行 一个简单的 PySpark 脚本,就像这样。
base_path = '/mnt/rawdata/'
file_names = ['2018/01/01/ABC1_20180101.gz',
'2018/01/02/ABC2_20180102.gz',
'2018/01/03/ABC3_20180103.gz',
'2018/01/01/XYZ1_20180101.gz'
'2018/01/02/XYZ1_20180102.gz']
for f in file_names:
print(f)
所以,只是测试一下,我可以找到文件并打印字符串。现在,我想弄清楚如何将每个文件的内容加载到 SQL 服务器中的特定 table。问题是,我想对与模式匹配的文件进行通配符搜索,并将特定文件加载到特定的 table 中。所以,我想做以下事情:
- 将名称中带有 'ABC' 的所有文件加载到我的 'ABC_Table' 中,并将名称中带有 'XYZ' 的所有文件加载到我的 'XYZ_Table' 中(所有数据开始于第 2 行,而不是第 1 行)
- 将文件名加载到每个 table 中名为 'file_name' 的字段中(我完全可以接受 'file_names' 中的整个字符串或 'file_names' 之后的字符串部分最后一个 '/' 字符;无关紧要)
我尝试为此使用 Azure 数据工厂,它可以很好地递归循环遍历所有文件,但它没有加载文件名,我真的需要 table 来区分哪些记录来自哪些文件和日期。是否可以使用 Azure Databricks 执行此操作?我觉得这是一个可以实现的 ETL 过程,但我对 ADB 的了解还不够多,无法完成这项工作。
根据大牛的推荐更新
dfCW = sc.sequenceFile('/mnt/rawdata/2018/01/01/ABC%.gz/').toDF()
dfCW.withColumn('input', input_file_name())
print(dfCW)
给我:
com.databricks.backend.daemon.data.common.InvalidMountException:
接下来我可以尝试什么?
您可以使用 pyspark.sql.functions
中的 input_file_name
例如
withFiles = df.withColumn("file", input_file_name())
之后您可以通过对新列进行过滤来创建多个数据框
abc = withFiles.filter(col("file").like("%ABC%"))
xyz = withFiles.filter(col("file").like("%XYZ%"))
然后为他们两个使用普通的编写器。
我是 运行 一个简单的 PySpark 脚本,就像这样。
base_path = '/mnt/rawdata/'
file_names = ['2018/01/01/ABC1_20180101.gz',
'2018/01/02/ABC2_20180102.gz',
'2018/01/03/ABC3_20180103.gz',
'2018/01/01/XYZ1_20180101.gz'
'2018/01/02/XYZ1_20180102.gz']
for f in file_names:
print(f)
所以,只是测试一下,我可以找到文件并打印字符串。现在,我想弄清楚如何将每个文件的内容加载到 SQL 服务器中的特定 table。问题是,我想对与模式匹配的文件进行通配符搜索,并将特定文件加载到特定的 table 中。所以,我想做以下事情:
- 将名称中带有 'ABC' 的所有文件加载到我的 'ABC_Table' 中,并将名称中带有 'XYZ' 的所有文件加载到我的 'XYZ_Table' 中(所有数据开始于第 2 行,而不是第 1 行)
- 将文件名加载到每个 table 中名为 'file_name' 的字段中(我完全可以接受 'file_names' 中的整个字符串或 'file_names' 之后的字符串部分最后一个 '/' 字符;无关紧要)
我尝试为此使用 Azure 数据工厂,它可以很好地递归循环遍历所有文件,但它没有加载文件名,我真的需要 table 来区分哪些记录来自哪些文件和日期。是否可以使用 Azure Databricks 执行此操作?我觉得这是一个可以实现的 ETL 过程,但我对 ADB 的了解还不够多,无法完成这项工作。
根据大牛的推荐更新
dfCW = sc.sequenceFile('/mnt/rawdata/2018/01/01/ABC%.gz/').toDF()
dfCW.withColumn('input', input_file_name())
print(dfCW)
给我:
com.databricks.backend.daemon.data.common.InvalidMountException:
接下来我可以尝试什么?
您可以使用 pyspark.sql.functions
中的 input_file_name
例如
withFiles = df.withColumn("file", input_file_name())
之后您可以通过对新列进行过滤来创建多个数据框
abc = withFiles.filter(col("file").like("%ABC%"))
xyz = withFiles.filter(col("file").like("%XYZ%"))
然后为他们两个使用普通的编写器。