选择每组数据框 spark scala 中的前 n 行
selecting top n rows in each group of dataframe spark scala
我在 spark scala 中有一个 df,我需要 groupBy id 并根据第二列中的 DateTime 进行排序,并且只取每组的前 5 行
------------------------------------
|id| DateTime |
------------------------------------
|340054675199675|15-01-2018 19:43:23|
|340054675199675|15-01-2018 10:56:43|
|340028465709212|10-01-2018 02:47:11|
|340054675199675|09-01-2018 10:59:10|
|340028465709212|02-01-2018 03:25:35|
|340054675199675|28-12-2017 05:48:04|
|340054675199675|21-12-2017 15:47:51|
|340028465709212|18-12-2017 10:33:04|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|12-12-2017 07:04:51|
|340054675199675|06-12-2017 08:52:38|
------------------------------------
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 10).drop("rn")
val dfMax = df.groupBy($"id".as("grouped_id")).agg(first($"DateTime").as("max_value")).limit(10)
val dfTopByJoin = df.join(broadcast(dfMax),
($"id" === $"grouped_id") && ($"DateTime" === $"max_value"))
Scala Code for achieving the desired output
import org.apache.spark.sql.expressions.Window
scala> df2.show
+---------------+-------------------+
| id| DateTime|
+---------------+-------------------+
|340054675199675|15-01-2018 19:43:23|
|340054675199675|15-01-2018 10:56:43|
|340028465709212|10-01-2018 02:47:11|
|340054675199675|09-01-2018 10:59:10|
|340028465709212|02-01-2018 03:25:35|
|340054675199675|28-12-2017 05:48:04|
|340054675199675|21-12-2017 15:47:51|
|340028465709212|18-12-2017 10:33:04|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|12-12-2017 07:04:51|
|340054675199675|06-12-2017 08:52:38|
+---------------+-------------------+
scala> df2.printSchema
root
|-- id: string (nullable = true)
|-- DateTime: string (nullable = true)
Dataframe COLUMN(DateTime) is in string format, so need to convert into timestamp so that we can easily sort the data based on the requirement.
var df3 = df2.withColumn("DateTime",to_timestamp($"DateTime","dd-MM-yyyy HH:mm:ss")
scala> df3.printSchema
root
|-- id: string (nullable = true)
|-- DateTime: timestamp (nullable = true)
Apply Window function for retrieving the desired output
val w= Window.partitionBy("id").orderBy("DateTime")
val dfTop = df3.withColumn("rn", row_number.over(w)).filter($"rn"<6).drop(col("rn"))
scala> dfTop.show
+---------------+-------------------+
| id| DateTime|
+---------------+-------------------+
|340028465709212|2017-12-12 07:04:51|
|340028465709212|2017-12-16 19:55:40|
|340028465709212|2017-12-16 19:55:40|
|340028465709212|2017-12-18 10:33:04|
|340028465709212|2018-01-02 03:25:35|
|340054675199675|2017-12-06 08:52:38|
|340054675199675|2017-12-21 15:47:51|
|340054675199675|2017-12-28 05:48:04|
|340054675199675|2018-01-09 10:59:10|
|340054675199675|2018-01-15 10:56:43|
+---------------+-------------------+
那么你会得到你想要的,因为Answer.HAppy HAdoooop
我在 spark scala 中有一个 df,我需要 groupBy id 并根据第二列中的 DateTime 进行排序,并且只取每组的前 5 行
------------------------------------
|id| DateTime |
------------------------------------
|340054675199675|15-01-2018 19:43:23|
|340054675199675|15-01-2018 10:56:43|
|340028465709212|10-01-2018 02:47:11|
|340054675199675|09-01-2018 10:59:10|
|340028465709212|02-01-2018 03:25:35|
|340054675199675|28-12-2017 05:48:04|
|340054675199675|21-12-2017 15:47:51|
|340028465709212|18-12-2017 10:33:04|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|12-12-2017 07:04:51|
|340054675199675|06-12-2017 08:52:38|
------------------------------------
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 10).drop("rn")
val dfMax = df.groupBy($"id".as("grouped_id")).agg(first($"DateTime").as("max_value")).limit(10)
val dfTopByJoin = df.join(broadcast(dfMax),
($"id" === $"grouped_id") && ($"DateTime" === $"max_value"))
Scala Code for achieving the desired output
import org.apache.spark.sql.expressions.Window
scala> df2.show
+---------------+-------------------+
| id| DateTime|
+---------------+-------------------+
|340054675199675|15-01-2018 19:43:23|
|340054675199675|15-01-2018 10:56:43|
|340028465709212|10-01-2018 02:47:11|
|340054675199675|09-01-2018 10:59:10|
|340028465709212|02-01-2018 03:25:35|
|340054675199675|28-12-2017 05:48:04|
|340054675199675|21-12-2017 15:47:51|
|340028465709212|18-12-2017 10:33:04|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|12-12-2017 07:04:51|
|340054675199675|06-12-2017 08:52:38|
+---------------+-------------------+
scala> df2.printSchema
root
|-- id: string (nullable = true)
|-- DateTime: string (nullable = true)
Dataframe COLUMN(DateTime) is in string format, so need to convert into timestamp so that we can easily sort the data based on the requirement.
var df3 = df2.withColumn("DateTime",to_timestamp($"DateTime","dd-MM-yyyy HH:mm:ss")
scala> df3.printSchema
root
|-- id: string (nullable = true)
|-- DateTime: timestamp (nullable = true)
Apply Window function for retrieving the desired output
val w= Window.partitionBy("id").orderBy("DateTime")
val dfTop = df3.withColumn("rn", row_number.over(w)).filter($"rn"<6).drop(col("rn"))
scala> dfTop.show
+---------------+-------------------+
| id| DateTime|
+---------------+-------------------+
|340028465709212|2017-12-12 07:04:51|
|340028465709212|2017-12-16 19:55:40|
|340028465709212|2017-12-16 19:55:40|
|340028465709212|2017-12-18 10:33:04|
|340028465709212|2018-01-02 03:25:35|
|340054675199675|2017-12-06 08:52:38|
|340054675199675|2017-12-21 15:47:51|
|340054675199675|2017-12-28 05:48:04|
|340054675199675|2018-01-09 10:59:10|
|340054675199675|2018-01-15 10:56:43|
+---------------+-------------------+
那么你会得到你想要的,因为Answer.HAppy HAdoooop