如何列出 Databricks dbfs 中的文件键**没有** dbutils
How to list file keys in Databricks dbfs **without** dbutils
显然 dbutils cannot be used in cmd-line spark-submits, you must use Jar Jobs for that,但由于其他要求,我必须使用 spark-submit 样式的作业,但仍然需要在 dbfs 中列出和迭代文件键以决定使用哪些文件作为输入到流程...
使用 scala,我可以使用 spark 或 hadoop 中的哪个库来检索特定模式的 dbfs:/filekeys
列表?
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
println(s"FileUtils.ls path: $inputDir")
val path = new Path(inputDir)
val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val fileStatuses = fs.listStatus(path)
fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}
使用上面的方法,如果我传入像 dbfs:/mnt/path/to/folder
这样的部分键前缀,而在所述“文件夹”中存在以下键:
/mnt/path/to/folder/file1.csv
/mnt/path/to/folder/file2.csv
当它命中时我得到 dbfs:/mnt/path/to/folder is not a directory
val path = new Path(inputDir)
需要使用SparkSession来完成
我们是这样做的:
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
def getFileSystem(sparkSession: SparkSession): FileSystem =
FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
def listContents(sparkSession: SparkSession, dir: String): Seq[String] = {
getFileSystem(sparkSession).listStatus(new path(dir)).toSeq.map(_.getPath).map(_.getName)
}
显然 dbutils cannot be used in cmd-line spark-submits, you must use Jar Jobs for that,但由于其他要求,我必须使用 spark-submit 样式的作业,但仍然需要在 dbfs 中列出和迭代文件键以决定使用哪些文件作为输入到流程...
使用 scala,我可以使用 spark 或 hadoop 中的哪个库来检索特定模式的 dbfs:/filekeys
列表?
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
println(s"FileUtils.ls path: $inputDir")
val path = new Path(inputDir)
val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val fileStatuses = fs.listStatus(path)
fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}
使用上面的方法,如果我传入像 dbfs:/mnt/path/to/folder
这样的部分键前缀,而在所述“文件夹”中存在以下键:
/mnt/path/to/folder/file1.csv
/mnt/path/to/folder/file2.csv
当它命中时我得到 dbfs:/mnt/path/to/folder is not a directory
val path = new Path(inputDir)
需要使用SparkSession来完成
我们是这样做的:
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
def getFileSystem(sparkSession: SparkSession): FileSystem =
FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
def listContents(sparkSession: SparkSession, dir: String): Seq[String] = {
getFileSystem(sparkSession).listStatus(new path(dir)).toSeq.map(_.getPath).map(_.getName)
}