如何在 groupBy 之后将值聚合到集合中?
How to aggregate values into collection after groupBy?
我有一个具有这样架构的数据框:
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
正在寻找一种方法来按 visitorid 将此数据框分组(或汇总?),其中 trackingIds 和 emailIds 列将附加在一起。因此,例如,如果我的初始 df 看起来像:
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []
我希望我的输出 df 看起来像这样
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]
正在尝试使用 groupBy
和 agg
运算符,但运气不佳。
火花 >= 2.4
您可以用内置 flatten
function
替换 flatten
udf
import org.apache.spark.sql.functions.flatten
其余部分保持原样。
Spark >= 2.0, < 2.4
这是可能的,但相当昂贵。使用您提供的数据:
case class Record(
visitorId: String, trackingIds: Array[String], emailIds: Array[String])
val df = Seq(
Record("a158", Array("666b"), Array("12")),
Record("7g21", Array("c0b5"), Array("45")),
Record("7g21", Array("c0b4"), Array("87")),
Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
和辅助函数:
import org.apache.spark.sql.functions.udf
val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
我们可以用占位符填空:
import org.apache.spark.sql.functions.{array, lit, when}
val dfWithPlaceholders = df.withColumn(
"emailIds",
when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
collect_lists
和 flatten
:
import org.apache.spark.sql.functions.{array, collect_list}
val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")
df
.groupBy($"visitorId")
.agg(trackingIds, emailIds)
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
使用静态类型 Dataset
:
df.as[Record]
.groupByKey(_.visitorId)
.mapGroups { case (key, vs) =>
vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
火花1.x
可以转换为RDD和group
import org.apache.spark.sql.Row
dfWithPlaceholders.rdd
.map {
case Row(id: String,
trcks: Seq[String @ unchecked],
emails: Seq[String @ unchecked]) => (id, (trcks, emails))
}
.groupByKey
.map {case (key, vs) => vs.toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
.toDF
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | 7g21| [c0b5, c0b4]|[45, 87]|
// | a158|[666b, 666b, 777c]| [12, ]|
// +---------+------------------+--------+
@zero323 的回答 相当 非常完整,但 Spark 为我们提供了更多的灵活性。下面的解决方案怎么样?
import org.apache.spark.sql.functions._
inventory
.select($"*", explode($"trackingIds") as "tracking_id")
.select($"*", explode($"emailIds") as "email_id")
.groupBy("visitorId")
.agg(
collect_list("tracking_id") as "trackingIds",
collect_list("email_id") as "emailIds")
然而,这排除了所有空集合(因此还有一些改进的余地:))
您可以使用用户定义的聚合函数。
1) 使用名为 customAggregation 的 scala class 创建自定义 UDAF。
package com.package.name
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
class CustomAggregation() extends UserDefinedAggregateFunction {
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("col5", ArrayType(StringType))))
// Intermediate Schema
def bufferSchema = StructType(Array(
StructField("col5_collapsed", ArrayType(StringType))))
// Returned Data Type .
def dataType: DataType = ArrayType(StringType)
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = Array.empty[String] // initialize array
}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) =
if(!input.isNullAt(0))
buffer.getList[String](0).toArray ++ input.getList[String](0).toArray
else
buffer.getList[String](0).toArray
}
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = buffer1.getList[String](0).toArray ++ buffer2.getList[String](0).toArray
}
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {
buffer.getList[String](0).asScala.toList.distinct
}
}
2) 然后在你的代码中使用UDAF作为
//define UDAF
val CustomAggregation = new CustomAggregation()
DataFrame
.groupBy(col1,col2,col3)
.agg(CustomAggregation(DataFrame(col5))).show()
我有一个具有这样架构的数据框:
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
正在寻找一种方法来按 visitorid 将此数据框分组(或汇总?),其中 trackingIds 和 emailIds 列将附加在一起。因此,例如,如果我的初始 df 看起来像:
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []
我希望我的输出 df 看起来像这样
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]
正在尝试使用 groupBy
和 agg
运算符,但运气不佳。
火花 >= 2.4
您可以用内置 flatten
function
flatten
udf
import org.apache.spark.sql.functions.flatten
其余部分保持原样。
Spark >= 2.0, < 2.4
这是可能的,但相当昂贵。使用您提供的数据:
case class Record(
visitorId: String, trackingIds: Array[String], emailIds: Array[String])
val df = Seq(
Record("a158", Array("666b"), Array("12")),
Record("7g21", Array("c0b5"), Array("45")),
Record("7g21", Array("c0b4"), Array("87")),
Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
和辅助函数:
import org.apache.spark.sql.functions.udf
val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
我们可以用占位符填空:
import org.apache.spark.sql.functions.{array, lit, when}
val dfWithPlaceholders = df.withColumn(
"emailIds",
when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
collect_lists
和 flatten
:
import org.apache.spark.sql.functions.{array, collect_list}
val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")
df
.groupBy($"visitorId")
.agg(trackingIds, emailIds)
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
使用静态类型 Dataset
:
df.as[Record]
.groupByKey(_.visitorId)
.mapGroups { case (key, vs) =>
vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
火花1.x
可以转换为RDD和group
import org.apache.spark.sql.Row
dfWithPlaceholders.rdd
.map {
case Row(id: String,
trcks: Seq[String @ unchecked],
emails: Seq[String @ unchecked]) => (id, (trcks, emails))
}
.groupByKey
.map {case (key, vs) => vs.toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
.toDF
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | 7g21| [c0b5, c0b4]|[45, 87]|
// | a158|[666b, 666b, 777c]| [12, ]|
// +---------+------------------+--------+
@zero323 的回答 相当 非常完整,但 Spark 为我们提供了更多的灵活性。下面的解决方案怎么样?
import org.apache.spark.sql.functions._
inventory
.select($"*", explode($"trackingIds") as "tracking_id")
.select($"*", explode($"emailIds") as "email_id")
.groupBy("visitorId")
.agg(
collect_list("tracking_id") as "trackingIds",
collect_list("email_id") as "emailIds")
然而,这排除了所有空集合(因此还有一些改进的余地:))
您可以使用用户定义的聚合函数。
1) 使用名为 customAggregation 的 scala class 创建自定义 UDAF。
package com.package.name
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
class CustomAggregation() extends UserDefinedAggregateFunction {
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("col5", ArrayType(StringType))))
// Intermediate Schema
def bufferSchema = StructType(Array(
StructField("col5_collapsed", ArrayType(StringType))))
// Returned Data Type .
def dataType: DataType = ArrayType(StringType)
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = Array.empty[String] // initialize array
}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) =
if(!input.isNullAt(0))
buffer.getList[String](0).toArray ++ input.getList[String](0).toArray
else
buffer.getList[String](0).toArray
}
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = buffer1.getList[String](0).toArray ++ buffer2.getList[String](0).toArray
}
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {
buffer.getList[String](0).asScala.toList.distinct
}
}
2) 然后在你的代码中使用UDAF作为
//define UDAF
val CustomAggregation = new CustomAggregation()
DataFrame
.groupBy(col1,col2,col3)
.agg(CustomAggregation(DataFrame(col5))).show()