Java 中的 SparkSQL 和 DataFrame 爆炸式增长
SparkSQL and explode on DataFrame in Java
有没有一种简单的方法可以在 SparkSQL DataFrame
的数组列上使用 explode
?在Scala中比较简单,但是这个函数在Java.
中好像是不可用的(javadoc中有提到)
一个选项是在查询中使用 SQLContext.sql(...)
和 explode
函数,但我正在寻找更好、更简洁的方法。 DataFrame
s 是从 parquet 文件加载的。
似乎可以使用 org.apache.spark.sql.functions.explode(Column col)
和 DataFrame.withColumn(String colName, Column col)
的组合来用它的展开版本替换该列。
我以这种方式解决了它:假设您有一个数组列,其中包含名为 "positions" 的职位描述,每个人 "fullName"。
然后你从初始模式中得到:
root
|-- fullName: string (nullable = true)
|-- positions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- title: string (nullable = true)
...
到架构:
root
|-- personName: string (nullable = true)
|-- companyName: string (nullable = true)
|-- positionTitle: string (nullable = true)
通过做:
DataFrame personPositions = persons.select(persons.col("fullName").as("personName"),
org.apache.spark.sql.functions.explode(persons.col("positions")).as("pos"));
DataFrame test = personPositions.select(personPositions.col("personName"),
personPositions.col("pos").getField("companyName").as("companyName"), personPositions.col("pos").getField("title").as("positionTitle"));
有没有一种简单的方法可以在 SparkSQL DataFrame
的数组列上使用 explode
?在Scala中比较简单,但是这个函数在Java.
一个选项是在查询中使用 SQLContext.sql(...)
和 explode
函数,但我正在寻找更好、更简洁的方法。 DataFrame
s 是从 parquet 文件加载的。
似乎可以使用 org.apache.spark.sql.functions.explode(Column col)
和 DataFrame.withColumn(String colName, Column col)
的组合来用它的展开版本替换该列。
我以这种方式解决了它:假设您有一个数组列,其中包含名为 "positions" 的职位描述,每个人 "fullName"。
然后你从初始模式中得到:
root
|-- fullName: string (nullable = true)
|-- positions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- title: string (nullable = true)
...
到架构:
root
|-- personName: string (nullable = true)
|-- companyName: string (nullable = true)
|-- positionTitle: string (nullable = true)
通过做:
DataFrame personPositions = persons.select(persons.col("fullName").as("personName"),
org.apache.spark.sql.functions.explode(persons.col("positions")).as("pos"));
DataFrame test = personPositions.select(personPositions.col("personName"),
personPositions.col("pos").getField("companyName").as("companyName"), personPositions.col("pos").getField("title").as("positionTitle"));