Pyspark 从文件名中获取子字符串并存储为新列

Pyspark Obtain Substring from Filename and Store as New Column

我正在使用 pyspark 处理来自 S3 的 CSV 文件,但是我希望将文件名合并为我正在使用以下代码的新列:

spark.udf.register("filenamefunc", lambda x: x.rsplit('/', 1)[-2])
df=spark.read.csv("s3a://exportcsv-battery/S5/243/101*",sep=',',header=True,inferSchema=True)
df=df.withColumn("filename", 'filenamefunc(input_file_name())')

但是我想要它的子字符串而不是文件名,例如,如果这是 input_file_name:-

s3a://exportcsv-battery/S5/243/101_002932_243_AAA_A_T01_AAA_AAA_0_0_0_0_2_10Hz.csv

我只想提取 243 并将其存储在我为其定义 UDF 的新列中:

spark.udf.register("filenamefunc", lambda x: x.rsplit('/', 1)[-2])

但是好像不行。我可以做些什么来修复它或采用不同的方法吗?谢谢!

您可以使用split()函数

import pyspark.sql.functions as f

[...]

df = df.withColumn('filename', f.split(f.input_file_name(), '/')[4])