如何使用 spark 将镶木地板数据转换为大小写 类?
How to convert parquet data to case classes with spark?
我有很多案例 类 我在 spark 中使用它来将数据保存为 parquet,例如:
case class Person(userId: String,
technographic: Option[Technographic] = None,
geographic: Option[Geographic] = None)
case class Technographic(browsers: Seq[Browser],
devices: Seq[Device],
oss: Seq[Os])
case class Browser(family: String,
major: Option[String] = None,
language: String
...
如何将磁盘上的数据转换回这些大小写类?
我需要能够 select 多列并将它们展开,以便每个列表(例如 browsers
)的所有子列表都具有相同的长度。
例如鉴于此原始数据:
Person(userId="1234",
technographic=Some(Technographic(browsers=Seq(
Browser(family=Some("IE"), major=Some(7), language=Some("en")),
Browser(family=None, major=None, language=Some("en-us")),
Browser(family=Some("Firefox), major=None, language=None)
)),
geographic=Some(Geographic(...))
)
我需要,例如浏览器数据如下(以及能够 select 所有列):
family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None
如果 spark 可以 explode
每个列表项,我可以获得。目前它只会做类似的事情(无论如何 explode
不适用于多列):
browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例集 类)?
一种可能的方法是:
val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x =>
... // somehow zip `fields` with the values so that I can
// access values by column name instead of index
// (which is brittle), but how?
}
给定
case class Browser(family: String,
major: Option[Int] = None,
language: String)
case class Tech(browsers: Seq[Browser],
devices: Seq[String],
oss: Seq[String])
case class Person(userId: String,
tech: Option[Tech] = None,
geographic: Option[String] = None)
和 org.apache.spark.sql.Row
的一些便利 types/functions
type A[E] = collection.mutable.WrappedArray[E]
implicit class RichRow(val r: Row) {
def getOpt[T](n: String): Option[T] = {
if (isNullAt(n)) {
None
} else {
Some(r.getAs[T](n))
}
}
def getStringOpt(n: String) = getOpt[String](n)
def getString(n: String) = getStringOpt(n).get
def getIntOpt(n: String) = getOpt[Int](n)
def getInt(n: String) = r.getIntOpt(n).get
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String) = r.getAs[A[Row]](n)
def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}
然后解析可以组织在一些函数中:
def toBrowser(r: Row): Browser = {
Browser(
r.getString("family"),
r.getIntOpt("major"),
r.getString("language"))
}
def toBrowsers(rows: A[Row]): Seq[Browser] = {
rows.map(toBrowser)
}
def toTech(r: Row): Tech = {
Tech(
toBrowsers(r.getRows("browsers")),
r.getArray[String]("devices"),
r.getArray[String]("oss"))
}
def toTechOpt(r: Row): Option[Tech] = {
Option(r).map(toTech)
}
def toPerson(r: Row): Person = {
Person(
r.getString("userId"),
toTechOpt(r.getRow("tech")),
r.getStringOpt("geographic"))
}
所以你可以写
df.map(toPerson).collect().foreach(println)
我已将解析函数组织成 "stand-alone" 方法。我通常会将这些作为 apply
放入案例 class 的伴随对象中,或者也作为 Row
的隐式值 classes。函数的原因是这样更容易粘贴到 spark-shell
每个解析函数直接处理普通列和数组,但在遇到集合时委托给另一个函数(Seq
和 Option
- 它们代表下一个嵌套级别)
implict class
应该 extend AnyVal
,但是同样不能粘贴到 spark-shell
为了详细说明已接受的答案,它没有正确处理空值。您需要尝试将其转换为字符串以查明它是否为空。但是,只有当值为 null 时才会成功 - 如果值为非 null 它将导致转换异常。
迷茫?这是代码:
implicit class RichRow(val r: Row) extends AnyVal {
def getBoolean(n: String) = r.getAs[Boolean](n)
def getBooleanOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getBoolean(n))
}
def getString(n: String) = r.getAs[String](n)
def getStringOpt(n: String) = Option(r.getString(n))
def getLong(n: String) = r.getAs[Long](n)
def getLongOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getLong(n))
}
def getInt(n: String) = r.getAs[Int](n)
def getIntOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getInt(n))
}
def getFloat(n: String) = r.getAs[Float](n)
def getFloatOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getFloat(n))
}
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String): A[Row] = r.getAs[A[Row]](n)
}
}
我有很多案例 类 我在 spark 中使用它来将数据保存为 parquet,例如:
case class Person(userId: String,
technographic: Option[Technographic] = None,
geographic: Option[Geographic] = None)
case class Technographic(browsers: Seq[Browser],
devices: Seq[Device],
oss: Seq[Os])
case class Browser(family: String,
major: Option[String] = None,
language: String
...
如何将磁盘上的数据转换回这些大小写类?
我需要能够 select 多列并将它们展开,以便每个列表(例如 browsers
)的所有子列表都具有相同的长度。
例如鉴于此原始数据:
Person(userId="1234",
technographic=Some(Technographic(browsers=Seq(
Browser(family=Some("IE"), major=Some(7), language=Some("en")),
Browser(family=None, major=None, language=Some("en-us")),
Browser(family=Some("Firefox), major=None, language=None)
)),
geographic=Some(Geographic(...))
)
我需要,例如浏览器数据如下(以及能够 select 所有列):
family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None
如果 spark 可以 explode
每个列表项,我可以获得。目前它只会做类似的事情(无论如何 explode
不适用于多列):
browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例集 类)?
一种可能的方法是:
val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x =>
... // somehow zip `fields` with the values so that I can
// access values by column name instead of index
// (which is brittle), but how?
}
给定
case class Browser(family: String,
major: Option[Int] = None,
language: String)
case class Tech(browsers: Seq[Browser],
devices: Seq[String],
oss: Seq[String])
case class Person(userId: String,
tech: Option[Tech] = None,
geographic: Option[String] = None)
和 org.apache.spark.sql.Row
type A[E] = collection.mutable.WrappedArray[E]
implicit class RichRow(val r: Row) {
def getOpt[T](n: String): Option[T] = {
if (isNullAt(n)) {
None
} else {
Some(r.getAs[T](n))
}
}
def getStringOpt(n: String) = getOpt[String](n)
def getString(n: String) = getStringOpt(n).get
def getIntOpt(n: String) = getOpt[Int](n)
def getInt(n: String) = r.getIntOpt(n).get
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String) = r.getAs[A[Row]](n)
def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}
然后解析可以组织在一些函数中:
def toBrowser(r: Row): Browser = {
Browser(
r.getString("family"),
r.getIntOpt("major"),
r.getString("language"))
}
def toBrowsers(rows: A[Row]): Seq[Browser] = {
rows.map(toBrowser)
}
def toTech(r: Row): Tech = {
Tech(
toBrowsers(r.getRows("browsers")),
r.getArray[String]("devices"),
r.getArray[String]("oss"))
}
def toTechOpt(r: Row): Option[Tech] = {
Option(r).map(toTech)
}
def toPerson(r: Row): Person = {
Person(
r.getString("userId"),
toTechOpt(r.getRow("tech")),
r.getStringOpt("geographic"))
}
所以你可以写
df.map(toPerson).collect().foreach(println)
我已将解析函数组织成 "stand-alone" 方法。我通常会将这些作为
apply
放入案例 class 的伴随对象中,或者也作为Row
的隐式值 classes。函数的原因是这样更容易粘贴到spark-shell
每个解析函数直接处理普通列和数组,但在遇到集合时委托给另一个函数(
Seq
和Option
- 它们代表下一个嵌套级别)implict class
应该extend AnyVal
,但是同样不能粘贴到spark-shell
为了详细说明已接受的答案,它没有正确处理空值。您需要尝试将其转换为字符串以查明它是否为空。但是,只有当值为 null 时才会成功 - 如果值为非 null 它将导致转换异常。
迷茫?这是代码:
implicit class RichRow(val r: Row) extends AnyVal {
def getBoolean(n: String) = r.getAs[Boolean](n)
def getBooleanOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getBoolean(n))
}
def getString(n: String) = r.getAs[String](n)
def getStringOpt(n: String) = Option(r.getString(n))
def getLong(n: String) = r.getAs[Long](n)
def getLongOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getLong(n))
}
def getInt(n: String) = r.getAs[Int](n)
def getIntOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getInt(n))
}
def getFloat(n: String) = r.getAs[Float](n)
def getFloatOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getFloat(n))
}
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String): A[Row] = r.getAs[A[Row]](n)
}
}