Spark - 如何按键进行条件减少?

Spark - How to do conditional reducing by key?

我有一个包含两列(键、值)的 DataFrame,如下所示:

+------------+--------------------+
|         key|               value|
+------------+--------------------+
|[sid2, sid5]|             value1 |
|      [sid2]|             value2 |
|      [sid6]|             value3 |
+------------+--------------------+

键是一组字符串,我想应用 reduceByKey 转换,如果它们之间存在交集,则两个键相等,输出应如下所示:

+------------+--------------------+
|         key|               value|
+------------+--------------------+
|[sid2, sid5]|   [value1, value2] |
|      [sid6]|             value3 |
+------------+--------------------+

我尝试使用 case class 作为键 wapper 并覆盖 equals 和 hashCode 函数,但它不起作用 (SPARK-2620)。

知道怎么做吗? 提前致谢。

更新 - DataFrame 架构:

root
 |-- id1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- events1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sid: string (nullable = true)
 |    |    |-- uid: string (nullable = true)
 |    |    |-- action: string (nullable = true)
 |    |    |-- touchPoint: string (nullable = true)
 |    |    |-- result: string (nullable = true)
 |    |    |-- timestamp: long (nullable = false)
 |    |    |-- url: string (nullable = true)
 |    |    |-- onlineId: long (nullable = false)
 |    |    |-- channel: string (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- clientId: long (nullable = false)
 |    |    |-- newUser: boolean (nullable = false)
 |    |    |-- userAgent: string (nullable = true)
 |    |    |-- group: string (nullable = true)
 |    |    |-- pageType: string (nullable = true)
 |    |    |-- clientIP: string (nullable = true)

这无法用 reduceByKey 解决,因为问题定义不适合 byKey 转换。核心要求是密钥具有明确定义的标识,但是这里不是这种情况。

考虑我们有键 [sid2, sid4, sid5][sid2, sid3, sid5] 的数据集。在那种情况下,无法将对象唯一地分配给分区。覆盖哈希码根本帮不了你。

更糟糕的是,一般情况下的问题是分布式的。考虑一组集合,例如对于每个集合,至少有一个具有非空交集的其他集合。在这种情况下,所有值都应合并为一个 "cluster".

总的来说 - 如果没有相当严格的限制,这对 Spark 来说不是一个好问题,根本无法用基本的 byKey 转换来解决。

低效的解决方案,可能会部分解决您的问题是使用笛卡尔积:

rdd.cartesian(rdd)
  .filter { case ((k1, _), (k2, _)) => intersects(v1, v2) }
  .map { case ((k, _), (_, v)) => (k, v) }
  .groupByKey
  .mapValues(_.flatten.toSet)

然而,这是低效的,并且不能解决歧义。

我认为使用 Spark SQL 的数据集 API 是可行的(结果是 @user9003280 的基于 RDD 的解决方案的直接翻译)。

// the dataset
val kvs = Seq(
  (Seq("sid2", "sid5"), "value1"),
  (Seq("sid2"), "value2"),
  (Seq("sid6"), "value3")).toDF("key", "value")
scala> kvs.show
+------------+------+
|         key| value|
+------------+------+
|[sid2, sid5]|value1|
|      [sid2]|value2|
|      [sid6]|value3|
+------------+------+

val intersect = udf { (ss: Seq[String], ts: Seq[String]) => ss intersect ts }
val solution = kvs.as("left")
  .join(kvs.as("right"))
  .where(size(intersect($"left.key", $"right.key")) > 0)
  .select($"left.key", $"right.value")
  .groupBy("key")
  .agg(collect_set("value") as "values")
  .dropDuplicates("values")
scala> solution.show
+------------+----------------+
|         key|          values|
+------------+----------------+
|      [sid6]|        [value3]|
|[sid2, sid5]|[value2, value1]|
+------------+----------------+

我在 100000 行 DataFrame 上尝试了笛卡尔乘积解决方案,处理它花费了很多时间,所以我决定使用图 GraphFrame,在线性时间内计算图的连通分量很简单 (根据图的顶点和边的数量)。

  • 创建顶点和边 DataFrame。
  • 构建图表。
  • 求连通分量。

最终结果如下:

+------------+------+----------
|         key| value|component
+------------+------+----------
|      [sid5]|value1|component1
|      [sid2]|value2|component1
|      [sid6]|value3|component2
+------------+------+-----------

然后groupBy("component")

就是这样:)