windows 在 partitioned/grouped 上按 window 聚合
Aggregate over time windows on a partitioned/grouped by window
我是 spark 和学习的新手。
我有这个 spark 数据框。我想按日期排序并获取按 'ID1'、'ID2' 和 'record_type'.
分区的最新记录
我的输入是这样的
data = [
("ACC.PXP", "7246", "2018-10-18T16:20:00", "Hospital", None, "IN"),
("ACC.PXP", "7246", "2018-10-18T16:20:00", None, "Foundation", "IN"),
("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"),
("ACC.PXP", "7246", "2018-11-11T00:00:00", None, "Washington", "OUT"),
("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital", None, "OUT"),
("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
("ACC.PXP", "7246", "2020-12-04T15:00:00", "Care", "Betel", "OUT"),
("ACC.PXP", "7246", "2020-13-04T15:00:00", "Care", None, "OUT"),
]
df = spark.createDataFrame(
data=data, schema=["ID1", "ID2", "date", "type", "name", "record_type"]
)
df.orderBy(F.col("date")).show(truncate=False)
+-------+----+-------------------+--------+----------+-----------+
|ID1 |ID2 |date |type |name |record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-10-18T16:20:00|null |Foundation|IN |
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|null |IN |
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|IN |
|ACC.PXP|7246|2018-11-11T00:00:00|null |Washington|OUT |
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|null |OUT |
|ACC.PXP|7246|2018-11-15T04:00:00|Home |null |IN |
|ACC.PXP|7246|2018-11-15T04:00:00|Home |null |IN |
|ACC.PXP|7246|2020-12-04T15:00:00|Care |Betel |OUT |
|ACC.PXP|7246|2020-13-04T15:00:00|Care |null |OUT |
+-------+----+-------------------+--------+----------+-----------+
...我的预期输出会像
data2 = [
("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"),
("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital", "Washington", "OUT"),
("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
("ACC.PXP", "7246", "2020-13-04T15:00:00", "Care", "Betel", "OUT"),
]
sdf = spark.createDataFrame(
data=data2, schema=["ID1", "ID2", "date", "type", "name", "record_type"]
)
sdf.orderBy(F.col("date")).show(truncate=False)
+-------+----+-------------------+--------+----------+-----------+
|ID1 |ID2 |date |type |name |record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|IN |
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington|OUT |
|ACC.PXP|7246|2018-11-15T04:00:00|Home |null |IN |
|ACC.PXP|7246|2020-13-04T15:00:00|Care |Betel |OUT |
+-------+----+-------------------+--------+----------+-----------+
我试过了,看起来它适用于这个示例数据集。但是,我测试实际数据时,逻辑似乎只选择了一条 'IN' 和一条 'OUT' 记录。非常感谢任何意见。
w2 = Window.partitionBy("ID1", "ID2", "type", "date").orderBy(F.desc("date"))
w3 = Window.partitionBy("ID1", "ID2", "type").orderBy(F.asc("date"))
w4 = Window.partitionBy("ID1", "ID2", "type").orderBy(F.desc("date"))
df1 = (
df.withColumn(
"type",
when(col("type").isNotNull(), col("type")).otherwise(
last("type", True).over(w1)
),
)
.withColumn(
"name",
when(col("name").isNotNull(), col("name")).otherwise(
last("name", True).over(w1)
),
)
.withColumn("row_number", F.row_number().over(w2))
.filter(F.col("row_number") == 1)
.drop("row_number")
)
df2 = (
df1.withColumn(
"type",
when(col("type").isNotNull(), col("type")).otherwise(
last("type", True).over(w3)
),
)
.withColumn(
"name",
when(col("name").isNotNull(), col("name")).otherwise(
F.last("name", True).over(w3)
),
)
.withColumn("GroupingSeq", F.row_number().over(w4))
.filter(F.col("GroupingSeq") == 1)
.drop("GroupingSeq")
)
df2.orderBy(F.asc("date")).show()
首先,您需要分配一个群组ID:
from pyspark.sql import functions as F, Window as W
df2 = (
df.withColumn(
"id",
F.when(
F.lag("record_type").over(W.partitionBy("ID1", "ID2").orderBy("date"))
== F.col("record_type"),
0,
).otherwise(1),
)
.withColumn("id", F.sum("id").over(W.partitionBy("ID1", "ID2").orderBy("date")))
)
df2.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital| null| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00| null|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00| null|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital| null| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| null| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+
然后,您对有空值的列赋值:
df3 = df2.withColumn(
"name",
F.coalesce(
F.col("name"),
F.max("name").over(W.partitionBy("ID1", "ID2", "id"))
)
).withColumn(
"type",
F.coalesce(
F.col("type"),
F.max("type").over(W.partitionBy("ID1", "ID2", "id"))
)
)
df3.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+
最后,您 select 每个 tuple("ID1", "ID2", "id")
的“最后”行:
df4 = df3.withColumn(
"row",
F.row_number().over(W.partitionBy("ID1", "ID2", "id").orderBy(F.col("date").desc()))
).where("row=1").drop("row", "id")
df4.show()
+-------+----+-------------------+--------+----------+-----------+
| ID1| ID2| date| type| name|record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT|
+-------+----+-------------------+--------+----------+-----------+
我是 spark 和学习的新手。
我有这个 spark 数据框。我想按日期排序并获取按 'ID1'、'ID2' 和 'record_type'.
分区的最新记录我的输入是这样的
data = [
("ACC.PXP", "7246", "2018-10-18T16:20:00", "Hospital", None, "IN"),
("ACC.PXP", "7246", "2018-10-18T16:20:00", None, "Foundation", "IN"),
("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"),
("ACC.PXP", "7246", "2018-11-11T00:00:00", None, "Washington", "OUT"),
("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital", None, "OUT"),
("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
("ACC.PXP", "7246", "2020-12-04T15:00:00", "Care", "Betel", "OUT"),
("ACC.PXP", "7246", "2020-13-04T15:00:00", "Care", None, "OUT"),
]
df = spark.createDataFrame(
data=data, schema=["ID1", "ID2", "date", "type", "name", "record_type"]
)
df.orderBy(F.col("date")).show(truncate=False)
+-------+----+-------------------+--------+----------+-----------+
|ID1 |ID2 |date |type |name |record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-10-18T16:20:00|null |Foundation|IN |
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|null |IN |
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|IN |
|ACC.PXP|7246|2018-11-11T00:00:00|null |Washington|OUT |
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|null |OUT |
|ACC.PXP|7246|2018-11-15T04:00:00|Home |null |IN |
|ACC.PXP|7246|2018-11-15T04:00:00|Home |null |IN |
|ACC.PXP|7246|2020-12-04T15:00:00|Care |Betel |OUT |
|ACC.PXP|7246|2020-13-04T15:00:00|Care |null |OUT |
+-------+----+-------------------+--------+----------+-----------+
...我的预期输出会像
data2 = [
("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"),
("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital", "Washington", "OUT"),
("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
("ACC.PXP", "7246", "2020-13-04T15:00:00", "Care", "Betel", "OUT"),
]
sdf = spark.createDataFrame(
data=data2, schema=["ID1", "ID2", "date", "type", "name", "record_type"]
)
sdf.orderBy(F.col("date")).show(truncate=False)
+-------+----+-------------------+--------+----------+-----------+
|ID1 |ID2 |date |type |name |record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|IN |
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington|OUT |
|ACC.PXP|7246|2018-11-15T04:00:00|Home |null |IN |
|ACC.PXP|7246|2020-13-04T15:00:00|Care |Betel |OUT |
+-------+----+-------------------+--------+----------+-----------+
我试过了,看起来它适用于这个示例数据集。但是,我测试实际数据时,逻辑似乎只选择了一条 'IN' 和一条 'OUT' 记录。非常感谢任何意见。
w2 = Window.partitionBy("ID1", "ID2", "type", "date").orderBy(F.desc("date"))
w3 = Window.partitionBy("ID1", "ID2", "type").orderBy(F.asc("date"))
w4 = Window.partitionBy("ID1", "ID2", "type").orderBy(F.desc("date"))
df1 = (
df.withColumn(
"type",
when(col("type").isNotNull(), col("type")).otherwise(
last("type", True).over(w1)
),
)
.withColumn(
"name",
when(col("name").isNotNull(), col("name")).otherwise(
last("name", True).over(w1)
),
)
.withColumn("row_number", F.row_number().over(w2))
.filter(F.col("row_number") == 1)
.drop("row_number")
)
df2 = (
df1.withColumn(
"type",
when(col("type").isNotNull(), col("type")).otherwise(
last("type", True).over(w3)
),
)
.withColumn(
"name",
when(col("name").isNotNull(), col("name")).otherwise(
F.last("name", True).over(w3)
),
)
.withColumn("GroupingSeq", F.row_number().over(w4))
.filter(F.col("GroupingSeq") == 1)
.drop("GroupingSeq")
)
df2.orderBy(F.asc("date")).show()
首先,您需要分配一个群组ID:
from pyspark.sql import functions as F, Window as W
df2 = (
df.withColumn(
"id",
F.when(
F.lag("record_type").over(W.partitionBy("ID1", "ID2").orderBy("date"))
== F.col("record_type"),
0,
).otherwise(1),
)
.withColumn("id", F.sum("id").over(W.partitionBy("ID1", "ID2").orderBy("date")))
)
df2.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital| null| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00| null|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00| null|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital| null| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| null| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+
然后,您对有空值的列赋值:
df3 = df2.withColumn(
"name",
F.coalesce(
F.col("name"),
F.max("name").over(W.partitionBy("ID1", "ID2", "id"))
)
).withColumn(
"type",
F.coalesce(
F.col("type"),
F.max("type").over(W.partitionBy("ID1", "ID2", "id"))
)
)
df3.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+
最后,您 select 每个 tuple("ID1", "ID2", "id")
的“最后”行:
df4 = df3.withColumn(
"row",
F.row_number().over(W.partitionBy("ID1", "ID2", "id").orderBy(F.col("date").desc()))
).where("row=1").drop("row", "id")
df4.show()
+-------+----+-------------------+--------+----------+-----------+
| ID1| ID2| date| type| name|record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT|
+-------+----+-------------------+--------+----------+-----------+