在 Spark DataFrame 中查找每组的最大行数
Find maximum row per group in Spark DataFrame
我正在尝试使用 Spark 数据帧而不是 RDD,因为它们看起来比 RDD 更高级,而且往往会生成更易读的代码。
在一个 14 节点 Google Dataproc 集群中,我有大约 600 万个名称被两个不同的系统转换为 ID:sa
和 sb
。每个 Row
包含 name
、id_sa
和 id_sb
。我的目标是生成一个从 id_sa
到 id_sb
的映射,这样对于每个 id_sa
,对应的 id_sb
是附加到 [=16= 的所有名称中最常见的 ID ].
让我们试着用一个例子来说明。如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
我的目标是生成从 a1
到 b2
的映射。实际上,与 a1
关联的名称是 n1
、n2
和 n3
,它们分别映射到 b1
、b2
和 b2
,因此 b2
是与 a1
关联的名称中最频繁的映射。同理,a2
会映射到b2
。可以假设总会有赢家:不需要打破平局。
我希望我可以在我的数据框上使用 groupBy(df.id_sa)
,但我不知道下一步该怎么做。我希望最终可以生成以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
但也许我正在尝试使用错误的工具,我应该回去使用 RDD。
使用join
(在并列的情况下会导致组中多行):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
使用 window 函数(将取消联系):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
使用 struct
排序:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
另见
我认为您可能正在寻找 window 函数:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
这是 Scala 中的一个示例(我现在没有带 Hive 的 Spark Shell,所以我无法测试代码,但我认为它应该可以工作):
case class MyRow(name: String, id_sa: String, id_sb: String)
val myDF = sc.parallelize(Array(
MyRow("n1", "a1", "b1"),
MyRow("n2", "a1", "b2"),
MyRow("n3", "a1", "b2"),
MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)
myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
可能有更有效的方法可以使用 Window 函数实现相同的结果,但我希望这能为您指明正确的方向。
我正在尝试使用 Spark 数据帧而不是 RDD,因为它们看起来比 RDD 更高级,而且往往会生成更易读的代码。
在一个 14 节点 Google Dataproc 集群中,我有大约 600 万个名称被两个不同的系统转换为 ID:sa
和 sb
。每个 Row
包含 name
、id_sa
和 id_sb
。我的目标是生成一个从 id_sa
到 id_sb
的映射,这样对于每个 id_sa
,对应的 id_sb
是附加到 [=16= 的所有名称中最常见的 ID ].
让我们试着用一个例子来说明。如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
我的目标是生成从 a1
到 b2
的映射。实际上,与 a1
关联的名称是 n1
、n2
和 n3
,它们分别映射到 b1
、b2
和 b2
,因此 b2
是与 a1
关联的名称中最频繁的映射。同理,a2
会映射到b2
。可以假设总会有赢家:不需要打破平局。
我希望我可以在我的数据框上使用 groupBy(df.id_sa)
,但我不知道下一步该怎么做。我希望最终可以生成以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
但也许我正在尝试使用错误的工具,我应该回去使用 RDD。
使用join
(在并列的情况下会导致组中多行):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
使用 window 函数(将取消联系):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
使用 struct
排序:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
另见
我认为您可能正在寻找 window 函数: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
这是 Scala 中的一个示例(我现在没有带 Hive 的 Spark Shell,所以我无法测试代码,但我认为它应该可以工作):
case class MyRow(name: String, id_sa: String, id_sb: String)
val myDF = sc.parallelize(Array(
MyRow("n1", "a1", "b1"),
MyRow("n2", "a1", "b2"),
MyRow("n3", "a1", "b2"),
MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)
myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
可能有更有效的方法可以使用 Window 函数实现相同的结果,但我希望这能为您指明正确的方向。