使用 sc.textFile 从子目录中递归获取文件内容
Recursively fetch file contents from subdirectories using sc.textFile
似乎 SparkContext textFile 只希望文件出现在给定的目录位置 - 它也不希望
- (a) 递归或
- (b) 甚至 支持 目录(尝试将目录作为文件读取)
关于如何构建递归的任何建议 - 可能比手动创建递归文件列表/下降逻辑更简单?
这是用例:
下的文件
/data/tables/my_table
我希望能够通过 hdfs 调用读取该父目录下所有目录级别的所有文件。
更新
sc.textFile() 通过(子类)TextInputFormat 调用 Hadoop FileInputFormat。内部逻辑确实存在进行递归目录读取 - 即首先检测条目是否是目录,如果是则降序:
<!-- language: java -->
for (FileStatus globStat: matches) {
218 if (globStat.isDir()) {
219 for(FileStatus stat: fs.listStatus(globStat.getPath(),
220 inputFilter)) {
221 result.add(stat);
222 }
223 } else {
224 result.add(globStat);
225 }
226 }
然而,当调用 sc.textFile 时,目录条目出现错误:"not a file"。这种行为令人困惑 - 考虑到处理目录的适当支持似乎已经到位。
我在看旧版本的 FileInputFormat..
BEFORE 设置递归配置 mapreduce.input.fileinputformat.input.dir.recursive
scala> sc.textFile("dev/*").count
java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build
默认为 null/not 集,计算结果为 "false":
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null
之后:
现在设置值:
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
现在重试递归操作:
scala>sc.textFile("dev/*/*").count
..
res5: Long = 3481
So it works.
Update 添加了 / 用于 @Ben
的每个评论的完整递归
我发现这些参数必须按以下方式设置:
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
似乎 SparkContext textFile 只希望文件出现在给定的目录位置 - 它也不希望
- (a) 递归或
- (b) 甚至 支持 目录(尝试将目录作为文件读取)
关于如何构建递归的任何建议 - 可能比手动创建递归文件列表/下降逻辑更简单?
这是用例:
下的文件/data/tables/my_table
我希望能够通过 hdfs 调用读取该父目录下所有目录级别的所有文件。
更新
sc.textFile() 通过(子类)TextInputFormat 调用 Hadoop FileInputFormat。内部逻辑确实存在进行递归目录读取 - 即首先检测条目是否是目录,如果是则降序:
<!-- language: java -->
for (FileStatus globStat: matches) {
218 if (globStat.isDir()) {
219 for(FileStatus stat: fs.listStatus(globStat.getPath(),
220 inputFilter)) {
221 result.add(stat);
222 }
223 } else {
224 result.add(globStat);
225 }
226 }
然而,当调用 sc.textFile 时,目录条目出现错误:"not a file"。这种行为令人困惑 - 考虑到处理目录的适当支持似乎已经到位。
我在看旧版本的 FileInputFormat..
BEFORE 设置递归配置 mapreduce.input.fileinputformat.input.dir.recursive
scala> sc.textFile("dev/*").count
java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build
默认为 null/not 集,计算结果为 "false":
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null
之后:
现在设置值:
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
现在重试递归操作:
scala>sc.textFile("dev/*/*").count
..
res5: Long = 3481
So it works.
Update 添加了 / 用于 @Ben
的每个评论的完整递归我发现这些参数必须按以下方式设置:
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")