激发群体内的成对差异
Spark pairwise differences within groups
我有一个 spark 数据框,为了便于讨论,我们假设它是:
val df = sc.parallelize(
Seq(("a",1,2),("a",1,4),("b",5,6),("b",10,2),("c",1,1))
).toDF("id","x","y")
+---+---+---+
| id| x| y|
+---+---+---+
| a| 1| 2|
| a| 1| 4|
| b| 5| 6|
| b| 10| 2|
| c| 1| 1|
+---+---+---+
我想计算数据帧中具有相同 ID 的条目之间的所有成对差异,并将结果输出到另一个数据帧。对于小型数据框,我可以通过以下方式完成此操作:
df.crossJoin(
df.select(
(df.columns.map(x=>col(x).as("_"+x))):_*)
).where(
col("id")===col("_id")
).select(
col("id"),
(col("x")-col("_x")).as("dx"),
(col("y")-col("_y")).as("dy")
)
+---+---+---+
| id| dx| dy|
+---+---+---+
| c| 0| 0|
| b| 0| 0|
| b| -5| 4|
| b| 5| -4|
| b| 0| 0|
| a| 0| 0|
| a| 0| -2|
| a| 0| 2|
| a| 0| 0|
+---+---+---+
但是,对于大型数据帧,这不是一种合理的方法,因为 crossJoin 将主要生成将被后续 where 子句丢弃的数据。
我对 spark 还是很陌生,groupBy 似乎是一个自然而然的开始寻找的地方,但我不知道如何使用 groupBy 来完成这个。欢迎任何帮助。
我最终想删除冗余,例如:
val df1 = df.withColumn("idx",monotonicallyIncreasingId)
df.crossJoin(
df.select(
(df.columns.map(x=>col(x).as("_"+x))):_*)
).where(
col("id")===col("_id") && col("idx") < col("_idx")
).select(
col("id"),
(col("x")-col("_x")).as("dx"),
(col("y")-col("_y")).as("dy")
)
+---+---+---+
| id| dx| dy|
+---+---+---+
| b| -5| 4|
| a| 0| -2|
+---+---+---+
但如果通过冗余更容易完成此任务,那么我可以接受。
这在 ML 中执行的转换并不少见,所以我认为 MLlib 中的一些东西可能是合适的,但我也没有在那里找到任何东西。
可以通过内连接实现,结果与预期相同:
df.alias("left").join(df.alias("right"),"id")
.select($"id",
($"left.x"-$"right.x").alias("dx"),
($"left.y"-$"right.y").alias("dy"))
我有一个 spark 数据框,为了便于讨论,我们假设它是:
val df = sc.parallelize(
Seq(("a",1,2),("a",1,4),("b",5,6),("b",10,2),("c",1,1))
).toDF("id","x","y")
+---+---+---+
| id| x| y|
+---+---+---+
| a| 1| 2|
| a| 1| 4|
| b| 5| 6|
| b| 10| 2|
| c| 1| 1|
+---+---+---+
我想计算数据帧中具有相同 ID 的条目之间的所有成对差异,并将结果输出到另一个数据帧。对于小型数据框,我可以通过以下方式完成此操作:
df.crossJoin(
df.select(
(df.columns.map(x=>col(x).as("_"+x))):_*)
).where(
col("id")===col("_id")
).select(
col("id"),
(col("x")-col("_x")).as("dx"),
(col("y")-col("_y")).as("dy")
)
+---+---+---+
| id| dx| dy|
+---+---+---+
| c| 0| 0|
| b| 0| 0|
| b| -5| 4|
| b| 5| -4|
| b| 0| 0|
| a| 0| 0|
| a| 0| -2|
| a| 0| 2|
| a| 0| 0|
+---+---+---+
但是,对于大型数据帧,这不是一种合理的方法,因为 crossJoin 将主要生成将被后续 where 子句丢弃的数据。
我对 spark 还是很陌生,groupBy 似乎是一个自然而然的开始寻找的地方,但我不知道如何使用 groupBy 来完成这个。欢迎任何帮助。
我最终想删除冗余,例如:
val df1 = df.withColumn("idx",monotonicallyIncreasingId)
df.crossJoin(
df.select(
(df.columns.map(x=>col(x).as("_"+x))):_*)
).where(
col("id")===col("_id") && col("idx") < col("_idx")
).select(
col("id"),
(col("x")-col("_x")).as("dx"),
(col("y")-col("_y")).as("dy")
)
+---+---+---+
| id| dx| dy|
+---+---+---+
| b| -5| 4|
| a| 0| -2|
+---+---+---+
但如果通过冗余更容易完成此任务,那么我可以接受。
这在 ML 中执行的转换并不少见,所以我认为 MLlib 中的一些东西可能是合适的,但我也没有在那里找到任何东西。
可以通过内连接实现,结果与预期相同:
df.alias("left").join(df.alias("right"),"id")
.select($"id",
($"left.x"-$"right.x").alias("dx"),
($"left.y"-$"right.y").alias("dy"))