UDF 仅从 Spark SQL 中的路径中提取文件名
UDF to extract only the file name from path in Spark SQL
Apache Spark 中有一个 input_file_name 函数,我用它来将新列添加到数据集中,其中包含当前正在处理的文件的名称。
问题是我想以某种方式将此函数自定义为 return 仅文件名,省略 s3 上的完整路径。
目前,我正在使用 map 函数替换第二步中的路径:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
但我想使用类似
的东西
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
在 Scala 中:
#register udf
spark.udf
.register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)
#use the udf to get last token(filename) in full path
val initialDs = spark.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name))
编辑:在Java中根据评论
#register udf
spark.udf()
.register("get_only_file_name", (String fullPath) -> {
int lastIndex = fullPath.lastIndexOf("/");
return fullPath.substring(lastIndex, fullPath.length - 1);
}, DataTypes.StringType);
import org.apache.spark.sql.functions.input_file_name
#use the udf to get last token(filename) in full path
Dataset<Row> initialDs = spark.read()
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name()));
借用一个相关问题,下面的方法更便携,不需要自定义UDF。
Spark SQL 代码段: reverse(split(path, '/'))[0]
Spark SQL 示例:
WITH sample_data as (
SELECT 'path/to/my/filename.txt' AS full_path
)
SELECT
full_path
, reverse(split(full_path, '/'))[0] as basename
FROM sample_data
解释:
split()
函数将路径分成块,reverse()
将最后一项(文件名)放在数组前面,以便 [0]
可以只提取文件名。
这里有完整的代码示例:
spark.sql(
"""
|WITH sample_data as (
| SELECT 'path/to/my/filename.txt' AS full_path
| )
| SELECT
| full_path
| , reverse(split(full_path, '/'))[0] as basename
| FROM sample_data
|""".stripMargin).show(false)
结果:
+-----------------------+------------+
|full_path |basename |
+-----------------------+------------+
|path/to/my/filename.txt|filename.txt|
+-----------------------+------------+
commons io is natural/easiest import in spark means(不需要添加额外的依赖...)
import org.apache.commons.io.FilenameUtils
getBaseName(String fileName)
从完整文件名中获取基本名称,减去完整路径和扩展名。
val baseNameOfFile = udf((longFilePath: String) => FilenameUtils.getBaseName(longFilePath))
用法就像...
yourdataframe.withColumn("shortpath" ,baseNameOfFile(yourdataframe("input_file_name")))
.show(1000,false)
Apache Spark 中有一个 input_file_name 函数,我用它来将新列添加到数据集中,其中包含当前正在处理的文件的名称。
问题是我想以某种方式将此函数自定义为 return 仅文件名,省略 s3 上的完整路径。
目前,我正在使用 map 函数替换第二步中的路径:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
但我想使用类似
的东西val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
在 Scala 中:
#register udf
spark.udf
.register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)
#use the udf to get last token(filename) in full path
val initialDs = spark.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name))
编辑:在Java中根据评论
#register udf
spark.udf()
.register("get_only_file_name", (String fullPath) -> {
int lastIndex = fullPath.lastIndexOf("/");
return fullPath.substring(lastIndex, fullPath.length - 1);
}, DataTypes.StringType);
import org.apache.spark.sql.functions.input_file_name
#use the udf to get last token(filename) in full path
Dataset<Row> initialDs = spark.read()
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name()));
借用一个相关问题
Spark SQL 代码段: reverse(split(path, '/'))[0]
Spark SQL 示例:
WITH sample_data as (
SELECT 'path/to/my/filename.txt' AS full_path
)
SELECT
full_path
, reverse(split(full_path, '/'))[0] as basename
FROM sample_data
解释:
split()
函数将路径分成块,reverse()
将最后一项(文件名)放在数组前面,以便 [0]
可以只提取文件名。
这里有完整的代码示例:
spark.sql(
"""
|WITH sample_data as (
| SELECT 'path/to/my/filename.txt' AS full_path
| )
| SELECT
| full_path
| , reverse(split(full_path, '/'))[0] as basename
| FROM sample_data
|""".stripMargin).show(false)
结果:
+-----------------------+------------+
|full_path |basename |
+-----------------------+------------+
|path/to/my/filename.txt|filename.txt|
+-----------------------+------------+
commons io is natural/easiest import in spark means(不需要添加额外的依赖...)
import org.apache.commons.io.FilenameUtils
getBaseName(String fileName)
从完整文件名中获取基本名称,减去完整路径和扩展名。
val baseNameOfFile = udf((longFilePath: String) => FilenameUtils.getBaseName(longFilePath))
用法就像...
yourdataframe.withColumn("shortpath" ,baseNameOfFile(yourdataframe("input_file_name")))
.show(1000,false)