激发群体内的成对差异

Spark pairwise differences within groups

我有一个 spark 数据框,为了便于讨论,我们假设它是:

val df = sc.parallelize(
    Seq(("a",1,2),("a",1,4),("b",5,6),("b",10,2),("c",1,1))
  ).toDF("id","x","y")
+---+---+---+
| id|  x|  y|
+---+---+---+
|  a|  1|  2|
|  a|  1|  4|
|  b|  5|  6|
|  b| 10|  2|
|  c|  1|  1|
+---+---+---+

我想计算数据帧中具有相同 ID 的条目之间的所有成对差异,并将结果输出到另一个数据帧。对于小型数据框,我可以通过以下方式完成此操作:

df.crossJoin(
  df.select(
    (df.columns.map(x=>col(x).as("_"+x))):_*)
  ).where(
    col("id")===col("_id")
  ).select(
    col("id"),
    (col("x")-col("_x")).as("dx"),
    (col("y")-col("_y")).as("dy")
  )
+---+---+---+
| id| dx| dy|
+---+---+---+
|  c|  0|  0|
|  b|  0|  0|
|  b| -5|  4|
|  b|  5| -4|
|  b|  0|  0|
|  a|  0|  0|
|  a|  0| -2|
|  a|  0|  2|
|  a|  0|  0|
+---+---+---+

但是,对于大型数据帧,这不是一种合理的方法,因为 crossJoin 将主要生成将被后续 where 子句丢弃的数据。

我对 spark 还是很陌生,groupBy 似乎是一个自然而然的开始寻找的地方,但我不知道如何使用 groupBy 来完成这个。欢迎任何帮助。

我最终想删除冗余,例如:

val df1 = df.withColumn("idx",monotonicallyIncreasingId)
df.crossJoin(
  df.select(
    (df.columns.map(x=>col(x).as("_"+x))):_*)
  ).where(
    col("id")===col("_id") && col("idx") < col("_idx")
  ).select(
    col("id"),
    (col("x")-col("_x")).as("dx"),
    (col("y")-col("_y")).as("dy")
  )

+---+---+---+
| id| dx| dy|
+---+---+---+
|  b| -5|  4|
|  a|  0| -2|
+---+---+---+

但如果通过冗余更容易完成此任务,那么我可以接受。

这在 ML 中执行的转换并不少见,所以我认为 MLlib 中的一些东西可能是合适的,但我也没有在那里找到任何东西。

可以通过内连接实现,结果与预期相同:

df.alias("left").join(df.alias("right"),"id")
  .select($"id",
      ($"left.x"-$"right.x").alias("dx"),
      ($"left.y"-$"right.y").alias("dy"))