如何在 Spark 行上执行 ETL 并将其 return 到数据帧?
How can I perform ETL on a Spark Row and return it to a dataframe?
我目前正在将 Scala Spark 用于某些 ETL,并且有一个包含以下架构的基础数据框
|-- round: string (nullable = true)
|-- Id : string (nullable = true)
|-- questions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- tag: string (nullable = true)
| | |-- bonusQuestions: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- difficulty : string (nullable = true)
| | |-- answerOptions: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- followUpAnswers: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- school: string (nullable = true)
我只需要对round
类型为primary
的行进行ETL(有primary和secondary两种类型)。但是,我在最终 table 中需要这两种类型的行。
我一直在做 ETL,应该根据 -
如果标签是非奖励,bonusQuestions
应设置为 null
,difficulty
应设置为 null
。
我目前能够访问 DF 的大部分字段,例如
val round = tr.getAs[String]("round")
接下来,我可以使用
获取问题数组
val questionsArray = tr.getAs[Seq[StructType]]("questions")
并且可以使用 for (question <- questionsArray) {...}
进行迭代;但是我无法访问 question.bonusQuestions
或 question.tag
之类的结构字段,其中 returns 是一个错误
error: value tag is not a member of org.apache.spark.sql.types.StructType
Spark 将 StructType
视为 GenericRowWithSchema
,更具体地说是 Row
。因此,您必须使用 Seq[Row]
作为
而不是 Seq[StructType]
val questionsArray = tr.getAs[Seq[Row]]("questions")
并且在循环 for (question <- questionsArray) {...}
中你可以得到 Row
的数据作为
for (question <- questionsArray) {
val tag = question.getAs[String]("tag")
val bonusQuestions = question.getAs[Seq[String]]("bonusQuestions")
val difficulty = question.getAs[String]("difficulty")
val answerOptions = question.getAs[Seq[String]]("answerOptions")
val followUpAnswers = question.getAs[Seq[String]]("followUpAnswers")
}
希望回答对你有帮助
我目前正在将 Scala Spark 用于某些 ETL,并且有一个包含以下架构的基础数据框
|-- round: string (nullable = true)
|-- Id : string (nullable = true)
|-- questions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- tag: string (nullable = true)
| | |-- bonusQuestions: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- difficulty : string (nullable = true)
| | |-- answerOptions: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- followUpAnswers: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- school: string (nullable = true)
我只需要对round
类型为primary
的行进行ETL(有primary和secondary两种类型)。但是,我在最终 table 中需要这两种类型的行。
我一直在做 ETL,应该根据 -
如果标签是非奖励,bonusQuestions
应设置为 null
,difficulty
应设置为 null
。
我目前能够访问 DF 的大部分字段,例如
val round = tr.getAs[String]("round")
接下来,我可以使用
获取问题数组val questionsArray = tr.getAs[Seq[StructType]]("questions")
并且可以使用 for (question <- questionsArray) {...}
进行迭代;但是我无法访问 question.bonusQuestions
或 question.tag
之类的结构字段,其中 returns 是一个错误
error: value tag is not a member of org.apache.spark.sql.types.StructType
Spark 将 StructType
视为 GenericRowWithSchema
,更具体地说是 Row
。因此,您必须使用 Seq[Row]
作为
Seq[StructType]
val questionsArray = tr.getAs[Seq[Row]]("questions")
并且在循环 for (question <- questionsArray) {...}
中你可以得到 Row
的数据作为
for (question <- questionsArray) {
val tag = question.getAs[String]("tag")
val bonusQuestions = question.getAs[Seq[String]]("bonusQuestions")
val difficulty = question.getAs[String]("difficulty")
val answerOptions = question.getAs[Seq[String]]("answerOptions")
val followUpAnswers = question.getAs[Seq[String]]("followUpAnswers")
}
希望回答对你有帮助