如何从嵌套的结构元素数组创建 Spark DataFrame?
How can I create a Spark DataFrame from a nested array of struct element?
我已将 JSON 文件读入 Spark。该文件具有以下结构:
scala> tweetBlob.printSchema
root
|-- related: struct (nullable = true)
| |-- next: struct (nullable = true)
| | |-- href: string (nullable = true)
|-- search: struct (nullable = true)
| |-- current: long (nullable = true)
| |-- results: long (nullable = true)
|-- tweets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cde: struct (nullable = true)
...
...
| | |-- cdeInternal: struct (nullable = true)
...
...
| | |-- message: struct (nullable = true)
...
...
我理想中想要的是一个包含列 "cde"、"cdeInternal"、"message"... 的 DataFrame,如下所示
root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...
我已经设法使用 "explode" 将 "tweets" 数组中的元素提取到名为 "tweets"
的列中
scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
|-- tweets: struct (nullable = true)
| |-- cde: struct (nullable = true)
...
...
| |-- cdeInternal: struct (nullable = true)
...
...
| |-- message: struct (nullable = true)
...
...
如何 select 结构中的所有列并从中创建 DataFrame?如果我的理解是正确的,爆炸对结构不起作用。
感谢任何帮助。
一种可能的处理方法是从架构中提取所需的信息。让我们从一些虚拟数据开始:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
case class Bar(x: Int, y: String)
case class Foo(bar: Bar)
val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df.printSchema
// root
// |-- bar: struct (nullable = true)
// | |-- x: integer (nullable = false)
// | |-- y: string (nullable = true)
和辅助函数:
def children(colname: String, df: DataFrame) = {
val parent = df.schema.fields.filter(_.name == colname).head
val fields = parent.dataType match {
case x: StructType => x.fields
case _ => Array.empty[StructField]
}
fields.map(x => col(s"$colname.${x.name}"))
}
最后结果:
df.select(children("bar", df): _*).printSchema
// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)
您可以使用
df
.select(explode(col("path_to_collection")).as("collection"))
.select(col("collection.*"))`:
示例:
scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""
scala> val inline = sqlContext.read.json(sc.parallelize(json :: Nil)).select(explode(col("schools")).as("collection")).select(col("collection.*"))
scala> inline.printSchema
root
|-- sname: string (nullable = true)
|-- year: long (nullable = true)
scala> inline.show
+--------+----+
| sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+
或者,您也可以使用 SQL 函数 inline
:
scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""
scala> sqlContext.read.json(sc.parallelize(json :: Nil)).registerTempTable("tmp")
scala> val inline = sqlContext.sql("SELECT inline(schools) FROM tmp")
scala> inline.printSchema
root
|-- sname: string (nullable = true)
|-- year: long (nullable = true)
scala> inline.show
+--------+----+
| sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> case class Bar(x: Int, y: String)
defined class Bar
scala> case class Foo(bar: Bar)
defined class Foo
scala> val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df: org.apache.spark.sql.DataFrame = [bar: struct<x: int, y: string>]
scala> df.printSchema
root
|-- bar: struct (nullable = true)
| |-- x: integer (nullable = false)
| |-- y: string (nullable = true)
scala> df.select("bar.*").printSchema
root
|-- x: integer (nullable = true)
|-- y: string (nullable = true)
scala>
我已将 JSON 文件读入 Spark。该文件具有以下结构:
scala> tweetBlob.printSchema
root
|-- related: struct (nullable = true)
| |-- next: struct (nullable = true)
| | |-- href: string (nullable = true)
|-- search: struct (nullable = true)
| |-- current: long (nullable = true)
| |-- results: long (nullable = true)
|-- tweets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cde: struct (nullable = true)
...
...
| | |-- cdeInternal: struct (nullable = true)
...
...
| | |-- message: struct (nullable = true)
...
...
我理想中想要的是一个包含列 "cde"、"cdeInternal"、"message"... 的 DataFrame,如下所示
root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...
我已经设法使用 "explode" 将 "tweets" 数组中的元素提取到名为 "tweets"
的列中scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
|-- tweets: struct (nullable = true)
| |-- cde: struct (nullable = true)
...
...
| |-- cdeInternal: struct (nullable = true)
...
...
| |-- message: struct (nullable = true)
...
...
如何 select 结构中的所有列并从中创建 DataFrame?如果我的理解是正确的,爆炸对结构不起作用。
感谢任何帮助。
一种可能的处理方法是从架构中提取所需的信息。让我们从一些虚拟数据开始:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
case class Bar(x: Int, y: String)
case class Foo(bar: Bar)
val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df.printSchema
// root
// |-- bar: struct (nullable = true)
// | |-- x: integer (nullable = false)
// | |-- y: string (nullable = true)
和辅助函数:
def children(colname: String, df: DataFrame) = {
val parent = df.schema.fields.filter(_.name == colname).head
val fields = parent.dataType match {
case x: StructType => x.fields
case _ => Array.empty[StructField]
}
fields.map(x => col(s"$colname.${x.name}"))
}
最后结果:
df.select(children("bar", df): _*).printSchema
// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)
您可以使用
df
.select(explode(col("path_to_collection")).as("collection"))
.select(col("collection.*"))`:
示例:
scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""
scala> val inline = sqlContext.read.json(sc.parallelize(json :: Nil)).select(explode(col("schools")).as("collection")).select(col("collection.*"))
scala> inline.printSchema
root
|-- sname: string (nullable = true)
|-- year: long (nullable = true)
scala> inline.show
+--------+----+
| sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+
或者,您也可以使用 SQL 函数 inline
:
scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""
scala> sqlContext.read.json(sc.parallelize(json :: Nil)).registerTempTable("tmp")
scala> val inline = sqlContext.sql("SELECT inline(schools) FROM tmp")
scala> inline.printSchema
root
|-- sname: string (nullable = true)
|-- year: long (nullable = true)
scala> inline.show
+--------+----+
| sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> case class Bar(x: Int, y: String)
defined class Bar
scala> case class Foo(bar: Bar)
defined class Foo
scala> val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df: org.apache.spark.sql.DataFrame = [bar: struct<x: int, y: string>]
scala> df.printSchema
root
|-- bar: struct (nullable = true)
| |-- x: integer (nullable = false)
| |-- y: string (nullable = true)
scala> df.select("bar.*").printSchema
root
|-- x: integer (nullable = true)
|-- y: string (nullable = true)
scala>