如何将具有多个定界符的文件转换为数据帧

How to convert the file with multiple delimiter to dataframe

我有一个文本文件如下

1234_4567_DigitalDoc_XRay-01.pdf
2345_5678_DigitalDoc_CTC-03.png
1234_5684_DigitalDoc_XRay-05.pdf
1234_3345_DigitalDoc_XRay-02.pdf

我期望输出为

| catg|sub_catg|      doc_name        |revision_label|extension|
|1234|     4567|DigitalDoc_XRay-01.pdf|   01         |pdf      |

我创建了自定义架构

 val customSchema = StructType(
      StructField("catg", StringType, true)
        :: StructField("sub_catg", StringType, true)
        :: StructField("doc_name", StringType, true)
        :: StructField("revision_label", StringType, true)
        :: StructField("extension", StringType, true)
        :: Nil
    )

我正在尝试将数据框创建为

val df = sparkSession.read
  .format("csv")
  .schema(customSchema)
  .option("delimiter", "_")
  .load("src/main/resources/data/sample.txt")

df.show()

我想知道如何通过自定义记录打破每一行

我可能会写一个 java 类似的代码,有人可以帮我解决问题吗?我是 spark 新手。

String word[] = line.split("_");

            String filenName[] = word[3].split("-");
            String revision = filenName[1];
            word[0]+","+word[1]+","+ word[2]+"_"+word[3]+","+revision.replace(".", " ");

您可以使用 spark functions 获取所需的详细信息 -

1。加载数据

 val data =
      """
        |1234_4567_DigitalDoc_XRay-01.pdf
        |2345_5678_DigitalDoc_CTC-03.png
        |1234_5684_DigitalDoc_XRay-05.pdf
        |1234_3345_DigitalDoc_XRay-02.pdf
      """.stripMargin

    val customSchema = StructType(
      StructField("catg", StringType, true)
        :: StructField("sub_catg", StringType, true)
        :: StructField("doc_name", StringType, true)
        :: StructField("revision_label", StringType, true)
        :: StructField("extension", StringType, true)
        :: Nil
    )
    val df = spark.read.schema(customSchema)
      .option("sep", "_")
      .csv(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

输出-

+----+--------+----------+--------------+---------+
|catg|sub_catg|doc_name  |revision_label|extension|
+----+--------+----------+--------------+---------+
|1234|4567    |DigitalDoc|XRay-01.pdf   |null     |
|2345|5678    |DigitalDoc|CTC-03.png    |null     |
|1234|5684    |DigitalDoc|XRay-05.pdf   |null     |
|1234|3345    |DigitalDoc|XRay-02.pdf   |null     |
+----+--------+----------+--------------+---------+

root
 |-- catg: string (nullable = true)
 |-- sub_catg: string (nullable = true)
 |-- doc_name: string (nullable = true)
 |-- revision_label: string (nullable = true)
 |-- extension: string (nullable = true)

2。提取所需信息

 df.withColumn("doc_name", concat_ws("_", col("doc_name"), col("revision_label")))
      .withColumn("extension", substring_index(col("revision_label"), ".", -1))
      .withColumn("revision_label", regexp_extract(col("revision_label"),"""\d+""", 0))
      .show(false)

输出-

+----+--------+----------------------+--------------+---------+
|catg|sub_catg|doc_name              |revision_label|extension|
+----+--------+----------------------+--------------+---------+
|1234|4567    |DigitalDoc_XRay-01.pdf|01            |pdf      |
|2345|5678    |DigitalDoc_CTC-03.png |03            |png      |
|1234|5684    |DigitalDoc_XRay-05.pdf|05            |pdf      |
|1234|3345    |DigitalDoc_XRay-02.pdf|02            |pdf      |
+----+--------+----------------------+--------------+---------+

正在您离开的地方接载。 您可以跳过架构定义并仅提及列名。其余解释与代码内联

import org.apache.spark.sql.DataFrame

object ParseFileNameToInfo {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    val df : DataFrame = spark.read
      .format("csv")
      .option("delimiter", "_")
      .load("src/main/resources/sampleFileNames.txt")
      //You dont need schema definition as it alwyas simple and all columns are string
        .toDF("catg","sub_catg","doc_name","extraColumn")


    import spark.implicits._

    val output : DataFrame = df.rdd
      //Map the 4 columns to our output columns
      .map( row => {
      val extraColumn = row.getString(3)
      val fileInfo = extraColumn.substring(extraColumn.indexOf("-")+1).split("\.")
      (row.getString(0),row.getString(1),row.getString(2).concat(row.getString(3)),fileInfo(0),fileInfo(1))
    })
      //Convert them to required output Dataframe
      .toDF("catg","sub_catg","doc_name","revision_label","extension")

    output.show()

  }

}

您可以使用 "split" 函数

  import org.apache.spark.sql.functions._

  val df = Seq("1234_4567_DigitalDoc_XRay-01.pdf",
  "2345_5678_DigitalDoc_CTC-03.png",
  "1234_5684_DigitalDoc_XRay-05.pdf",
  "1234_3345_DigitalDoc_XRay-02.pdf")
  .toDF("filename")

  df.select(split($"filename","_").as("x"))
  .select(
    $"x".getItem(0).as("cat"),
    $"x".getItem(1).as("subcat"),
    $"x".getItem(2).as("doc"),
    split($"x".getItem(3), "\.").as("y")
  )
  .select($"cat", $"subcat", $"doc",
    $"y".getItem(0).as("rev"),
    $"y".getItem(1).as("ext")
  )
  .show(false)

+----+------+----------+-------+---+
|cat |subcat|doc       |rev    |ext|
+----+------+----------+-------+---+
|1234|4567  |DigitalDoc|XRay-01|pdf|
|2345|5678  |DigitalDoc|CTC-03 |png|
|1234|5684  |DigitalDoc|XRay-05|pdf|
|1234|3345  |DigitalDoc|XRay-02|pdf|
+----+------+----------+-------+---+

由于所有列都是字符串类型,因此无需根据您的情况定义架构。您可以编写如下代码

import org.apache.spark.sql.functions.{col,split,regexp_extract}

val data = spark.read.option("delimiter","_").csv(src/main/resources/data/sample.txt").toDF("catg","sub_catg","doc_name","no_name")

val df2 = data.withColumn("revision_label",regexp_extract(col("no_name"),".*-(\w+)\.",1)).withColumn("extension",split(col("no_name"),"\.")(1)).drop("no_name")