Spark - 如何按键进行条件减少?
Spark - How to do conditional reducing by key?
我有一个包含两列(键、值)的 DataFrame,如下所示:
+------------+--------------------+
| key| value|
+------------+--------------------+
|[sid2, sid5]| value1 |
| [sid2]| value2 |
| [sid6]| value3 |
+------------+--------------------+
键是一组字符串,我想应用 reduceByKey 转换,如果它们之间存在交集,则两个键相等,输出应如下所示:
+------------+--------------------+
| key| value|
+------------+--------------------+
|[sid2, sid5]| [value1, value2] |
| [sid6]| value3 |
+------------+--------------------+
我尝试使用 case class 作为键 wapper 并覆盖 equals 和 hashCode 函数,但它不起作用 (SPARK-2620)。
知道怎么做吗?
提前致谢。
更新 - DataFrame 架构:
root
|-- id1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- events1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- sid: string (nullable = true)
| | |-- uid: string (nullable = true)
| | |-- action: string (nullable = true)
| | |-- touchPoint: string (nullable = true)
| | |-- result: string (nullable = true)
| | |-- timestamp: long (nullable = false)
| | |-- url: string (nullable = true)
| | |-- onlineId: long (nullable = false)
| | |-- channel: string (nullable = true)
| | |-- category: string (nullable = true)
| | |-- clientId: long (nullable = false)
| | |-- newUser: boolean (nullable = false)
| | |-- userAgent: string (nullable = true)
| | |-- group: string (nullable = true)
| | |-- pageType: string (nullable = true)
| | |-- clientIP: string (nullable = true)
这无法用 reduceByKey
解决,因为问题定义不适合 byKey
转换。核心要求是密钥具有明确定义的标识,但是这里不是这种情况。
考虑我们有键 [sid2, sid4, sid5]
和 [sid2, sid3, sid5]
的数据集。在那种情况下,无法将对象唯一地分配给分区。覆盖哈希码根本帮不了你。
更糟糕的是,一般情况下的问题是分布式的。考虑一组集合,例如对于每个集合,至少有一个具有非空交集的其他集合。在这种情况下,所有值都应合并为一个 "cluster".
总的来说 - 如果没有相当严格的限制,这对 Spark 来说不是一个好问题,根本无法用基本的 byKey
转换来解决。
低效的解决方案,可能会部分解决您的问题是使用笛卡尔积:
rdd.cartesian(rdd)
.filter { case ((k1, _), (k2, _)) => intersects(v1, v2) }
.map { case ((k, _), (_, v)) => (k, v) }
.groupByKey
.mapValues(_.flatten.toSet)
然而,这是低效的,并且不能解决歧义。
我认为使用 Spark SQL 的数据集 API 是可行的(结果是 @user9003280 的基于 RDD 的解决方案的直接翻译)。
// the dataset
val kvs = Seq(
(Seq("sid2", "sid5"), "value1"),
(Seq("sid2"), "value2"),
(Seq("sid6"), "value3")).toDF("key", "value")
scala> kvs.show
+------------+------+
| key| value|
+------------+------+
|[sid2, sid5]|value1|
| [sid2]|value2|
| [sid6]|value3|
+------------+------+
val intersect = udf { (ss: Seq[String], ts: Seq[String]) => ss intersect ts }
val solution = kvs.as("left")
.join(kvs.as("right"))
.where(size(intersect($"left.key", $"right.key")) > 0)
.select($"left.key", $"right.value")
.groupBy("key")
.agg(collect_set("value") as "values")
.dropDuplicates("values")
scala> solution.show
+------------+----------------+
| key| values|
+------------+----------------+
| [sid6]| [value3]|
|[sid2, sid5]|[value2, value1]|
+------------+----------------+
我在 100000 行 DataFrame 上尝试了笛卡尔乘积解决方案,处理它花费了很多时间,所以我决定使用图 GraphFrame,在线性时间内计算图的连通分量很简单 (根据图的顶点和边的数量)。
- 创建顶点和边 DataFrame。
- 构建图表。
- 求连通分量。
最终结果如下:
+------------+------+----------
| key| value|component
+------------+------+----------
| [sid5]|value1|component1
| [sid2]|value2|component1
| [sid6]|value3|component2
+------------+------+-----------
然后groupBy("component")
就是这样:)
我有一个包含两列(键、值)的 DataFrame,如下所示:
+------------+--------------------+
| key| value|
+------------+--------------------+
|[sid2, sid5]| value1 |
| [sid2]| value2 |
| [sid6]| value3 |
+------------+--------------------+
键是一组字符串,我想应用 reduceByKey 转换,如果它们之间存在交集,则两个键相等,输出应如下所示:
+------------+--------------------+
| key| value|
+------------+--------------------+
|[sid2, sid5]| [value1, value2] |
| [sid6]| value3 |
+------------+--------------------+
我尝试使用 case class 作为键 wapper 并覆盖 equals 和 hashCode 函数,但它不起作用 (SPARK-2620)。
知道怎么做吗? 提前致谢。
更新 - DataFrame 架构:
root
|-- id1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- events1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- sid: string (nullable = true)
| | |-- uid: string (nullable = true)
| | |-- action: string (nullable = true)
| | |-- touchPoint: string (nullable = true)
| | |-- result: string (nullable = true)
| | |-- timestamp: long (nullable = false)
| | |-- url: string (nullable = true)
| | |-- onlineId: long (nullable = false)
| | |-- channel: string (nullable = true)
| | |-- category: string (nullable = true)
| | |-- clientId: long (nullable = false)
| | |-- newUser: boolean (nullable = false)
| | |-- userAgent: string (nullable = true)
| | |-- group: string (nullable = true)
| | |-- pageType: string (nullable = true)
| | |-- clientIP: string (nullable = true)
这无法用 reduceByKey
解决,因为问题定义不适合 byKey
转换。核心要求是密钥具有明确定义的标识,但是这里不是这种情况。
考虑我们有键 [sid2, sid4, sid5]
和 [sid2, sid3, sid5]
的数据集。在那种情况下,无法将对象唯一地分配给分区。覆盖哈希码根本帮不了你。
更糟糕的是,一般情况下的问题是分布式的。考虑一组集合,例如对于每个集合,至少有一个具有非空交集的其他集合。在这种情况下,所有值都应合并为一个 "cluster".
总的来说 - 如果没有相当严格的限制,这对 Spark 来说不是一个好问题,根本无法用基本的 byKey
转换来解决。
低效的解决方案,可能会部分解决您的问题是使用笛卡尔积:
rdd.cartesian(rdd)
.filter { case ((k1, _), (k2, _)) => intersects(v1, v2) }
.map { case ((k, _), (_, v)) => (k, v) }
.groupByKey
.mapValues(_.flatten.toSet)
然而,这是低效的,并且不能解决歧义。
我认为使用 Spark SQL 的数据集 API 是可行的(结果是 @user9003280 的基于 RDD 的解决方案的直接翻译)。
// the dataset
val kvs = Seq(
(Seq("sid2", "sid5"), "value1"),
(Seq("sid2"), "value2"),
(Seq("sid6"), "value3")).toDF("key", "value")
scala> kvs.show
+------------+------+
| key| value|
+------------+------+
|[sid2, sid5]|value1|
| [sid2]|value2|
| [sid6]|value3|
+------------+------+
val intersect = udf { (ss: Seq[String], ts: Seq[String]) => ss intersect ts }
val solution = kvs.as("left")
.join(kvs.as("right"))
.where(size(intersect($"left.key", $"right.key")) > 0)
.select($"left.key", $"right.value")
.groupBy("key")
.agg(collect_set("value") as "values")
.dropDuplicates("values")
scala> solution.show
+------------+----------------+
| key| values|
+------------+----------------+
| [sid6]| [value3]|
|[sid2, sid5]|[value2, value1]|
+------------+----------------+
我在 100000 行 DataFrame 上尝试了笛卡尔乘积解决方案,处理它花费了很多时间,所以我决定使用图 GraphFrame,在线性时间内计算图的连通分量很简单 (根据图的顶点和边的数量)。
- 创建顶点和边 DataFrame。
- 构建图表。
- 求连通分量。
最终结果如下:
+------------+------+----------
| key| value|component
+------------+------+----------
| [sid5]|value1|component1
| [sid2]|value2|component1
| [sid6]|value3|component2
+------------+------+-----------
然后groupBy("component")
就是这样:)