如何将具有多个定界符的文件转换为数据帧
How to convert the file with multiple delimiter to dataframe
我有一个文本文件如下
1234_4567_DigitalDoc_XRay-01.pdf
2345_5678_DigitalDoc_CTC-03.png
1234_5684_DigitalDoc_XRay-05.pdf
1234_3345_DigitalDoc_XRay-02.pdf
我期望输出为
| catg|sub_catg| doc_name |revision_label|extension|
|1234| 4567|DigitalDoc_XRay-01.pdf| 01 |pdf |
我创建了自定义架构
val customSchema = StructType(
StructField("catg", StringType, true)
:: StructField("sub_catg", StringType, true)
:: StructField("doc_name", StringType, true)
:: StructField("revision_label", StringType, true)
:: StructField("extension", StringType, true)
:: Nil
)
我正在尝试将数据框创建为
val df = sparkSession.read
.format("csv")
.schema(customSchema)
.option("delimiter", "_")
.load("src/main/resources/data/sample.txt")
df.show()
我想知道如何通过自定义记录打破每一行
我可能会写一个 java 类似的代码,有人可以帮我解决问题吗?我是 spark 新手。
String word[] = line.split("_");
String filenName[] = word[3].split("-");
String revision = filenName[1];
word[0]+","+word[1]+","+ word[2]+"_"+word[3]+","+revision.replace(".", " ");
您可以使用 spark functions 获取所需的详细信息 -
1。加载数据
val data =
"""
|1234_4567_DigitalDoc_XRay-01.pdf
|2345_5678_DigitalDoc_CTC-03.png
|1234_5684_DigitalDoc_XRay-05.pdf
|1234_3345_DigitalDoc_XRay-02.pdf
""".stripMargin
val customSchema = StructType(
StructField("catg", StringType, true)
:: StructField("sub_catg", StringType, true)
:: StructField("doc_name", StringType, true)
:: StructField("revision_label", StringType, true)
:: StructField("extension", StringType, true)
:: Nil
)
val df = spark.read.schema(customSchema)
.option("sep", "_")
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
输出-
+----+--------+----------+--------------+---------+
|catg|sub_catg|doc_name |revision_label|extension|
+----+--------+----------+--------------+---------+
|1234|4567 |DigitalDoc|XRay-01.pdf |null |
|2345|5678 |DigitalDoc|CTC-03.png |null |
|1234|5684 |DigitalDoc|XRay-05.pdf |null |
|1234|3345 |DigitalDoc|XRay-02.pdf |null |
+----+--------+----------+--------------+---------+
root
|-- catg: string (nullable = true)
|-- sub_catg: string (nullable = true)
|-- doc_name: string (nullable = true)
|-- revision_label: string (nullable = true)
|-- extension: string (nullable = true)
2。提取所需信息
df.withColumn("doc_name", concat_ws("_", col("doc_name"), col("revision_label")))
.withColumn("extension", substring_index(col("revision_label"), ".", -1))
.withColumn("revision_label", regexp_extract(col("revision_label"),"""\d+""", 0))
.show(false)
输出-
+----+--------+----------------------+--------------+---------+
|catg|sub_catg|doc_name |revision_label|extension|
+----+--------+----------------------+--------------+---------+
|1234|4567 |DigitalDoc_XRay-01.pdf|01 |pdf |
|2345|5678 |DigitalDoc_CTC-03.png |03 |png |
|1234|5684 |DigitalDoc_XRay-05.pdf|05 |pdf |
|1234|3345 |DigitalDoc_XRay-02.pdf|02 |pdf |
+----+--------+----------------------+--------------+---------+
正在您离开的地方接载。
您可以跳过架构定义并仅提及列名。其余解释与代码内联
import org.apache.spark.sql.DataFrame
object ParseFileNameToInfo {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
val df : DataFrame = spark.read
.format("csv")
.option("delimiter", "_")
.load("src/main/resources/sampleFileNames.txt")
//You dont need schema definition as it alwyas simple and all columns are string
.toDF("catg","sub_catg","doc_name","extraColumn")
import spark.implicits._
val output : DataFrame = df.rdd
//Map the 4 columns to our output columns
.map( row => {
val extraColumn = row.getString(3)
val fileInfo = extraColumn.substring(extraColumn.indexOf("-")+1).split("\.")
(row.getString(0),row.getString(1),row.getString(2).concat(row.getString(3)),fileInfo(0),fileInfo(1))
})
//Convert them to required output Dataframe
.toDF("catg","sub_catg","doc_name","revision_label","extension")
output.show()
}
}
您可以使用 "split" 函数
import org.apache.spark.sql.functions._
val df = Seq("1234_4567_DigitalDoc_XRay-01.pdf",
"2345_5678_DigitalDoc_CTC-03.png",
"1234_5684_DigitalDoc_XRay-05.pdf",
"1234_3345_DigitalDoc_XRay-02.pdf")
.toDF("filename")
df.select(split($"filename","_").as("x"))
.select(
$"x".getItem(0).as("cat"),
$"x".getItem(1).as("subcat"),
$"x".getItem(2).as("doc"),
split($"x".getItem(3), "\.").as("y")
)
.select($"cat", $"subcat", $"doc",
$"y".getItem(0).as("rev"),
$"y".getItem(1).as("ext")
)
.show(false)
+----+------+----------+-------+---+
|cat |subcat|doc |rev |ext|
+----+------+----------+-------+---+
|1234|4567 |DigitalDoc|XRay-01|pdf|
|2345|5678 |DigitalDoc|CTC-03 |png|
|1234|5684 |DigitalDoc|XRay-05|pdf|
|1234|3345 |DigitalDoc|XRay-02|pdf|
+----+------+----------+-------+---+
由于所有列都是字符串类型,因此无需根据您的情况定义架构。您可以编写如下代码
import org.apache.spark.sql.functions.{col,split,regexp_extract}
val data = spark.read.option("delimiter","_").csv(src/main/resources/data/sample.txt").toDF("catg","sub_catg","doc_name","no_name")
val df2 = data.withColumn("revision_label",regexp_extract(col("no_name"),".*-(\w+)\.",1)).withColumn("extension",split(col("no_name"),"\.")(1)).drop("no_name")
我有一个文本文件如下
1234_4567_DigitalDoc_XRay-01.pdf
2345_5678_DigitalDoc_CTC-03.png
1234_5684_DigitalDoc_XRay-05.pdf
1234_3345_DigitalDoc_XRay-02.pdf
我期望输出为
| catg|sub_catg| doc_name |revision_label|extension|
|1234| 4567|DigitalDoc_XRay-01.pdf| 01 |pdf |
我创建了自定义架构
val customSchema = StructType(
StructField("catg", StringType, true)
:: StructField("sub_catg", StringType, true)
:: StructField("doc_name", StringType, true)
:: StructField("revision_label", StringType, true)
:: StructField("extension", StringType, true)
:: Nil
)
我正在尝试将数据框创建为
val df = sparkSession.read
.format("csv")
.schema(customSchema)
.option("delimiter", "_")
.load("src/main/resources/data/sample.txt")
df.show()
我想知道如何通过自定义记录打破每一行
我可能会写一个 java 类似的代码,有人可以帮我解决问题吗?我是 spark 新手。
String word[] = line.split("_");
String filenName[] = word[3].split("-");
String revision = filenName[1];
word[0]+","+word[1]+","+ word[2]+"_"+word[3]+","+revision.replace(".", " ");
您可以使用 spark functions 获取所需的详细信息 -
1。加载数据
val data =
"""
|1234_4567_DigitalDoc_XRay-01.pdf
|2345_5678_DigitalDoc_CTC-03.png
|1234_5684_DigitalDoc_XRay-05.pdf
|1234_3345_DigitalDoc_XRay-02.pdf
""".stripMargin
val customSchema = StructType(
StructField("catg", StringType, true)
:: StructField("sub_catg", StringType, true)
:: StructField("doc_name", StringType, true)
:: StructField("revision_label", StringType, true)
:: StructField("extension", StringType, true)
:: Nil
)
val df = spark.read.schema(customSchema)
.option("sep", "_")
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
输出-
+----+--------+----------+--------------+---------+
|catg|sub_catg|doc_name |revision_label|extension|
+----+--------+----------+--------------+---------+
|1234|4567 |DigitalDoc|XRay-01.pdf |null |
|2345|5678 |DigitalDoc|CTC-03.png |null |
|1234|5684 |DigitalDoc|XRay-05.pdf |null |
|1234|3345 |DigitalDoc|XRay-02.pdf |null |
+----+--------+----------+--------------+---------+
root
|-- catg: string (nullable = true)
|-- sub_catg: string (nullable = true)
|-- doc_name: string (nullable = true)
|-- revision_label: string (nullable = true)
|-- extension: string (nullable = true)
2。提取所需信息
df.withColumn("doc_name", concat_ws("_", col("doc_name"), col("revision_label")))
.withColumn("extension", substring_index(col("revision_label"), ".", -1))
.withColumn("revision_label", regexp_extract(col("revision_label"),"""\d+""", 0))
.show(false)
输出-
+----+--------+----------------------+--------------+---------+
|catg|sub_catg|doc_name |revision_label|extension|
+----+--------+----------------------+--------------+---------+
|1234|4567 |DigitalDoc_XRay-01.pdf|01 |pdf |
|2345|5678 |DigitalDoc_CTC-03.png |03 |png |
|1234|5684 |DigitalDoc_XRay-05.pdf|05 |pdf |
|1234|3345 |DigitalDoc_XRay-02.pdf|02 |pdf |
+----+--------+----------------------+--------------+---------+
正在您离开的地方接载。 您可以跳过架构定义并仅提及列名。其余解释与代码内联
import org.apache.spark.sql.DataFrame
object ParseFileNameToInfo {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
val df : DataFrame = spark.read
.format("csv")
.option("delimiter", "_")
.load("src/main/resources/sampleFileNames.txt")
//You dont need schema definition as it alwyas simple and all columns are string
.toDF("catg","sub_catg","doc_name","extraColumn")
import spark.implicits._
val output : DataFrame = df.rdd
//Map the 4 columns to our output columns
.map( row => {
val extraColumn = row.getString(3)
val fileInfo = extraColumn.substring(extraColumn.indexOf("-")+1).split("\.")
(row.getString(0),row.getString(1),row.getString(2).concat(row.getString(3)),fileInfo(0),fileInfo(1))
})
//Convert them to required output Dataframe
.toDF("catg","sub_catg","doc_name","revision_label","extension")
output.show()
}
}
您可以使用 "split" 函数
import org.apache.spark.sql.functions._
val df = Seq("1234_4567_DigitalDoc_XRay-01.pdf",
"2345_5678_DigitalDoc_CTC-03.png",
"1234_5684_DigitalDoc_XRay-05.pdf",
"1234_3345_DigitalDoc_XRay-02.pdf")
.toDF("filename")
df.select(split($"filename","_").as("x"))
.select(
$"x".getItem(0).as("cat"),
$"x".getItem(1).as("subcat"),
$"x".getItem(2).as("doc"),
split($"x".getItem(3), "\.").as("y")
)
.select($"cat", $"subcat", $"doc",
$"y".getItem(0).as("rev"),
$"y".getItem(1).as("ext")
)
.show(false)
+----+------+----------+-------+---+
|cat |subcat|doc |rev |ext|
+----+------+----------+-------+---+
|1234|4567 |DigitalDoc|XRay-01|pdf|
|2345|5678 |DigitalDoc|CTC-03 |png|
|1234|5684 |DigitalDoc|XRay-05|pdf|
|1234|3345 |DigitalDoc|XRay-02|pdf|
+----+------+----------+-------+---+
由于所有列都是字符串类型,因此无需根据您的情况定义架构。您可以编写如下代码
import org.apache.spark.sql.functions.{col,split,regexp_extract}
val data = spark.read.option("delimiter","_").csv(src/main/resources/data/sample.txt").toDF("catg","sub_catg","doc_name","no_name")
val df2 = data.withColumn("revision_label",regexp_extract(col("no_name"),".*-(\w+)\.",1)).withColumn("extension",split(col("no_name"),"\.")(1)).drop("no_name")