如何分组并合并spark dataframe组的行
How to group by and merge these rows of spark dataframe's group
假设我有一个这样的table,
A | B | C | D | E | F
x1 | 5 | 20200115 | 15 | 4.5 | 1
x1 | 10 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1
我希望在 col A
上合并这些行并生成这样的数据框
A | B | C | D | E | F
x1 | 15 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1
基本上,如果A列组的B列总和等于D列的值,那么,
- B 列的新值将是 B 列的总和
- C、E、F 列将根据 C 列的最新日期(YYYYmmDD 中的日期)选取
由于对于X2组,上述条件不成立(即B列的总和为20大于D列的15),我想保留目标中的两条记录
假设:在我的数据中,给定组的 D 列将相同(在本例中为 15)
我看了一堆分组和开窗(分区)的例子,但在我看来这是不同的,我无法缩小路线范围。
我可以将分组数据通过管道传输到 UDF 并执行某些操作吗?
PS: 在 pyspark 中构建这个,如果你的示例可以在 pyspark 中,那就太好了
试试这个-
使用sum
+ max
加窗函数
df.show(false)
df.printSchema()
/**
* +---+---+--------+---+---+---+
* |A |B |C |D |E |F |
* +---+---+--------+---+---+---+
* |x1 |5 |20200115|15 |4.5|1 |
* |x1 |10 |20200825|15 |5.6|19 |
* |x2 |10 |20200115|15 |4.1|1 |
* |x2 |10 |20200430|15 |9.1|1 |
* +---+---+--------+---+---+---+
*
* root
* |-- A: string (nullable = true)
* |-- B: integer (nullable = true)
* |-- C: integer (nullable = true)
* |-- D: integer (nullable = true)
* |-- E: double (nullable = true)
* |-- F: integer (nullable = true)
*/
val w = Window.partitionBy("A")
df.withColumn("sum", sum("B").over(w))
.withColumn("latestC", max("C").over(w))
.withColumn("retain",
when($"sum" === $"D", when($"latestC" === $"C", true).otherwise(false) )
.otherwise(true) )
.where($"retain" === true)
.withColumn("B", when($"sum" === $"D", when($"latestC" === $"C", $"sum").otherwise($"B") )
.otherwise($"B"))
.show(false)
/**
* +---+---+--------+---+---+---+---+--------+------+
* |A |B |C |D |E |F |sum|latestC |retain|
* +---+---+--------+---+---+---+---+--------+------+
* |x1 |15 |20200825|15 |5.6|19 |15 |20200825|true |
* |x2 |10 |20200115|15 |4.1|1 |20 |20200430|true |
* |x2 |10 |20200430|15 |9.1|1 |20 |20200430|true |
* +---+---+--------+---+---+---+---+--------+------+
*/
在 pyspark 中,我会这样做:
from pyspark.sql import functions as F, Window as W
b = ["A", "B", "C", "D", "E", "F"]
a = [
("x1", 5, "20200115", 15, 4.5, 1),
("x1", 10, "20200825", 15, 5.6, 19),
("x2", 10, "20200115", 15, 4.1, 1),
("x2", 10, "20200430", 15, 9.1, 1),
]
df = spark.createDataFrame(a, b)
df = df.withColumn("B_sum", F.sum("B").over(W.partitionBy("A")))
process_df = df.where("D >= B_Sum")
no_process_df = df.where("D < B_sum").drop("B_sum")
process_df = (
process_df.withColumn(
"rng", F.row_number().over(W.partitionBy("A").orderBy(F.col("C").desc()))
)
.where("rng=1")
.select("A", F.col("B_sum").alias("B"), "C", "D", "E", "F",)
)
final_output = process_df.unionByName(no_process_df)
+---+---+--------+---+---+---+
| A| B| C| D| E| F|
+---+---+--------+---+---+---+
| x1| 15|20200825| 15|5.6| 19|
| x2| 10|20200115| 15|4.1| 1|
| x2| 10|20200430| 15|9.1| 1|
+---+---+--------+---+---+---+
假设我有一个这样的table,
A | B | C | D | E | F
x1 | 5 | 20200115 | 15 | 4.5 | 1
x1 | 10 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1
我希望在 col A
上合并这些行并生成这样的数据框
A | B | C | D | E | F
x1 | 15 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1
基本上,如果A列组的B列总和等于D列的值,那么,
- B 列的新值将是 B 列的总和
- C、E、F 列将根据 C 列的最新日期(YYYYmmDD 中的日期)选取
由于对于X2组,上述条件不成立(即B列的总和为20大于D列的15),我想保留目标中的两条记录
假设:在我的数据中,给定组的 D 列将相同(在本例中为 15)
我看了一堆分组和开窗(分区)的例子,但在我看来这是不同的,我无法缩小路线范围。
我可以将分组数据通过管道传输到 UDF 并执行某些操作吗?
PS: 在 pyspark 中构建这个,如果你的示例可以在 pyspark 中,那就太好了
试试这个-
使用sum
+ max
加窗函数
df.show(false)
df.printSchema()
/**
* +---+---+--------+---+---+---+
* |A |B |C |D |E |F |
* +---+---+--------+---+---+---+
* |x1 |5 |20200115|15 |4.5|1 |
* |x1 |10 |20200825|15 |5.6|19 |
* |x2 |10 |20200115|15 |4.1|1 |
* |x2 |10 |20200430|15 |9.1|1 |
* +---+---+--------+---+---+---+
*
* root
* |-- A: string (nullable = true)
* |-- B: integer (nullable = true)
* |-- C: integer (nullable = true)
* |-- D: integer (nullable = true)
* |-- E: double (nullable = true)
* |-- F: integer (nullable = true)
*/
val w = Window.partitionBy("A")
df.withColumn("sum", sum("B").over(w))
.withColumn("latestC", max("C").over(w))
.withColumn("retain",
when($"sum" === $"D", when($"latestC" === $"C", true).otherwise(false) )
.otherwise(true) )
.where($"retain" === true)
.withColumn("B", when($"sum" === $"D", when($"latestC" === $"C", $"sum").otherwise($"B") )
.otherwise($"B"))
.show(false)
/**
* +---+---+--------+---+---+---+---+--------+------+
* |A |B |C |D |E |F |sum|latestC |retain|
* +---+---+--------+---+---+---+---+--------+------+
* |x1 |15 |20200825|15 |5.6|19 |15 |20200825|true |
* |x2 |10 |20200115|15 |4.1|1 |20 |20200430|true |
* |x2 |10 |20200430|15 |9.1|1 |20 |20200430|true |
* +---+---+--------+---+---+---+---+--------+------+
*/
在 pyspark 中,我会这样做:
from pyspark.sql import functions as F, Window as W
b = ["A", "B", "C", "D", "E", "F"]
a = [
("x1", 5, "20200115", 15, 4.5, 1),
("x1", 10, "20200825", 15, 5.6, 19),
("x2", 10, "20200115", 15, 4.1, 1),
("x2", 10, "20200430", 15, 9.1, 1),
]
df = spark.createDataFrame(a, b)
df = df.withColumn("B_sum", F.sum("B").over(W.partitionBy("A")))
process_df = df.where("D >= B_Sum")
no_process_df = df.where("D < B_sum").drop("B_sum")
process_df = (
process_df.withColumn(
"rng", F.row_number().over(W.partitionBy("A").orderBy(F.col("C").desc()))
)
.where("rng=1")
.select("A", F.col("B_sum").alias("B"), "C", "D", "E", "F",)
)
final_output = process_df.unionByName(no_process_df)
+---+---+--------+---+---+---+
| A| B| C| D| E| F|
+---+---+--------+---+---+---+
| x1| 15|20200825| 15|5.6| 19|
| x2| 10|20200115| 15|4.1| 1|
| x2| 10|20200430| 15|9.1| 1|
+---+---+--------+---+---+---+