如何使用 spark 将镶木地板数据转换为大小写 类?

How to convert parquet data to case classes with spark?

我有很多案例 类 我在 spark 中使用它来将数据保存为 parquet,例如:

case class Person(userId: String,
              technographic: Option[Technographic] = None,
              geographic: Option[Geographic] = None)

case class Technographic(browsers: Seq[Browser], 
                     devices: Seq[Device],
                     oss: Seq[Os])

case class Browser(family: String,
               major: Option[String] = None, 
               language: String

...

如何将磁盘上的数据转换回这些大小写类?

我需要能够 select 多列并将它们展开,以便每个列表(例如 browsers)的所有子列表都具有相同的长度。

例如鉴于此原始数据:

Person(userId="1234",
  technographic=Some(Technographic(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en")),
    Browser(family=None, major=None, language=Some("en-us")),
    Browser(family=Some("Firefox), major=None, language=None)
  )),
  geographic=Some(Geographic(...))
)

我需要,例如浏览器数据如下(以及能够 select 所有列):

family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None

如果 spark 可以 explode 每个列表项,我可以获得。目前它只会做类似的事情(无论如何 explode 不适用于多列):

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]

那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例集 类)?

一种可能的方法是:

val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x => 
  ... // somehow zip `fields` with the values so that I can 
      // access values by column name instead of index 
      // (which is brittle), but how?
}

给定

case class Browser(family: String,
                   major: Option[Int] = None,
                   language: String)

case class Tech(browsers: Seq[Browser],
                devices: Seq[String],
                oss: Seq[String])


case class Person(userId: String,
                  tech: Option[Tech] = None,
                  geographic: Option[String] = None)

org.apache.spark.sql.Row

的一些便利 types/functions
type A[E] = collection.mutable.WrappedArray[E]

implicit class RichRow(val r: Row) {
  def getOpt[T](n: String): Option[T] = {
    if (isNullAt(n)) {
      None
    } else {
      Some(r.getAs[T](n))
    }
  }

  def getStringOpt(n: String) = getOpt[String](n)
  def getString(n: String) = getStringOpt(n).get

  def getIntOpt(n: String) = getOpt[Int](n)
  def getInt(n: String) = r.getIntOpt(n).get

  def getArray[T](n: String) = r.getAs[A[T]](n)

  def getRow(n: String) = r.getAs[Row](n)
  def getRows(n: String) = r.getAs[A[Row]](n)

  def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}

然后解析可以组织在一些函数中:

def toBrowser(r: Row): Browser = {
  Browser(
    r.getString("family"),
    r.getIntOpt("major"),
    r.getString("language"))
}

def toBrowsers(rows: A[Row]): Seq[Browser] = {
  rows.map(toBrowser)
}

def toTech(r: Row): Tech = {
  Tech(
    toBrowsers(r.getRows("browsers")),
    r.getArray[String]("devices"),
    r.getArray[String]("oss"))
}

def toTechOpt(r: Row): Option[Tech] = {
  Option(r).map(toTech)
}

def toPerson(r: Row): Person = {
  Person(
    r.getString("userId"),
    toTechOpt(r.getRow("tech")),
    r.getStringOpt("geographic"))
}

所以你可以写

df.map(toPerson).collect().foreach(println)

  • 我已将解析函数组织成 "stand-alone" 方法。我通常会将这些作为 apply 放入案例 class 的伴随对象中,或者也作为 Row 的隐式值 classes。函数的原因是这样更容易粘贴到 spark-shell

  • 每个解析函数直接处理普通列和数组,但在遇到集合时委托给另一个函数(SeqOption - 它们代表下一个嵌套级别)

  • implict class 应该 extend AnyVal,但是同样不能粘贴到 spark-shell

为了详细说明已接受的答案,它没有正确处理空值。您需要尝试将其转换为字符串以查明它是否为空。但是,只有当值为 null 时才会成功 - 如果值为非 null 它将导致转换异常。

迷茫?这是代码:

implicit class RichRow(val r: Row) extends AnyVal {

    def getBoolean(n: String) = r.getAs[Boolean](n)
    def getBooleanOpt(n: String) = Try(r.getString(n)) match {
      case Success(_) => None
      case _ => Option(r.getBoolean(n))
    }

    def getString(n: String) = r.getAs[String](n)
    def getStringOpt(n: String) = Option(r.getString(n))

    def getLong(n: String) = r.getAs[Long](n)
    def getLongOpt(n: String) = Try(r.getString(n)) match {
      case Success(_) => None
      case _ => Option(r.getLong(n))
    } 

    def getInt(n: String) = r.getAs[Int](n)
    def getIntOpt(n: String) = Try(r.getString(n)) match {
      case Success(_) => None
      case _ => Option(r.getInt(n))
    } 

    def getFloat(n: String) = r.getAs[Float](n)
    def getFloatOpt(n: String) = Try(r.getString(n)) match {
      case Success(_) => None
      case _ => Option(r.getFloat(n))
    }

    def getArray[T](n: String) = r.getAs[A[T]](n)

    def getRow(n: String) = r.getAs[Row](n)
    def getRows(n: String): A[Row] = r.getAs[A[Row]](n)
  }
}