如何在 Spark 行上执行 ETL 并将其 return 到数据帧?

How can I perform ETL on a Spark Row and return it to a dataframe?

我目前正在将 Scala Spark 用于某些 ETL,并且有一个包含以下架构的基础数据框

|-- round: string (nullable = true)
|-- Id : string (nullable = true)
|-- questions: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- tag: string (nullable = true)
|    |    |-- bonusQuestions: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- difficulty : string (nullable = true)
|    |    |-- answerOptions: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- followUpAnswers: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|-- school: string (nullable = true)

我只需要对round类型为primary的行进行ETL(有primary和secondary两种类型)。但是,我在最终 table 中需要这两种类型的行。

我一直在做 ETL,应该根据 - 如果标签是非奖励,bonusQuestions 应设置为 nulldifficulty 应设置为 null

我目前能够访问 DF 的大部分字段,例如 val round = tr.getAs[String]("round")

接下来,我可以使用

获取问题数组

val questionsArray = tr.getAs[Seq[StructType]]("questions")

并且可以使用 for (question <- questionsArray) {...} 进行迭代;但是我无法访问 question.bonusQuestionsquestion.tag 之类的结构字段,其中 returns 是一个错误

error: value tag is not a member of org.apache.spark.sql.types.StructType

Spark 将 StructType 视为 GenericRowWithSchema,更具体地说是 Row。因此,您必须使用 Seq[Row] 作为

而不是 Seq[StructType]
val questionsArray = tr.getAs[Seq[Row]]("questions")

并且在循环 for (question <- questionsArray) {...} 中你可以得到 Row 的数据作为

for (question <- questionsArray) {
    val tag = question.getAs[String]("tag")
    val bonusQuestions = question.getAs[Seq[String]]("bonusQuestions")
    val difficulty = question.getAs[String]("difficulty")
    val answerOptions = question.getAs[Seq[String]]("answerOptions")
    val followUpAnswers = question.getAs[Seq[String]]("followUpAnswers")
  }

希望回答对你有帮助