计算日期在每个 ID 的日期范围内的行
Count Rows where a date is within a date range for each ID
我们有一个数据框(数百万行),包括:
- 编号
- 开始日期
- 结束日期
- 日期
对于每一行,我们采用日期变量并希望计算每个 id 存在多少行,该日期位于开始日期和结束日期之间。
然后该值应包含在新列中 ("sum_of_rows")。
这是我们期望的 table(使用 sum_of_rows 创建变量):
+---+----------+----------+----------+-----------+
| Id| start| end| date|sum_of_rows|
+---+----------+----------+----------+-----------+
| A|2008-01-02|2010-01-01|2009-01-01| 2|
| A|2005-01-02|2012-01-01| null| null|
| A|2013-01-02|2015-01-01|2014-01-01| 1|
| B|2002-01-02|2019-01-01|2003-01-01| 1|
| B|2015-01-02|2017-01-01|2016-01-01| 2|
+---+----------+----------+----------+-----------+
示例:
我们看第一行。取日期“2009-01-01”,想看
在 ID 是行 ID 的所有行中(此处为 A)并计数
日期“2009-01-01”在开始和结束之间的行数(本例中的第 1 行和第 2 行为真)。
原始代码table:
table = spark.createDataFrame(
[
["A", '2008-01-02', '2010-01-01', '2009-01-01'],
["A", '2005-01-02', '2012-01-01', None],
["A", '2013-01-02', '2015-01-01', '2014-01-01'],
["B", '2002-01-02', '2019-01-01', '2003-01-01'],
["B", '2015-01-02', '2017-01-01', '2016-01-01']
],
("Id", "start", "end", "date")
)
此代码有效,但会创建一个 "product" 联接,不建议对大量数据使用此联接。
table2 = table.select(
F.col("id"),
F.col("start").alias("s"),
F.col("end").alias("e"),
)
table3 = table.join(
table2, on="id"
)
table3 = table3.withColumn(
"one",
F.when(
F.col("date").between(F.col("s"),F.col("e")),
1
).otherwise(0)
)
table3.groupBy(
"Id",
"start",
"end",
"date"
).agg(F.sum("one").alias("sum_of_rows")).show()
+---+----------+----------+----------+-----------+
| Id| start| end| date|sum_of_rows|
+---+----------+----------+----------+-----------+
| B|2002-01-02|2019-01-01|2003-01-01| 1|
| B|2015-01-02|2017-01-01|2016-01-01| 2|
| A|2008-01-02|2010-01-01|2009-01-01| 2|
| A|2013-01-02|2015-01-01|2014-01-01| 1|
| A|2005-01-02|2012-01-01| null| 0|
+---+----------+----------+----------+-----------+
我们有一个数据框(数百万行),包括:
- 编号
- 开始日期
- 结束日期
- 日期
对于每一行,我们采用日期变量并希望计算每个 id 存在多少行,该日期位于开始日期和结束日期之间。 然后该值应包含在新列中 ("sum_of_rows")。
这是我们期望的 table(使用 sum_of_rows 创建变量):
+---+----------+----------+----------+-----------+
| Id| start| end| date|sum_of_rows|
+---+----------+----------+----------+-----------+
| A|2008-01-02|2010-01-01|2009-01-01| 2|
| A|2005-01-02|2012-01-01| null| null|
| A|2013-01-02|2015-01-01|2014-01-01| 1|
| B|2002-01-02|2019-01-01|2003-01-01| 1|
| B|2015-01-02|2017-01-01|2016-01-01| 2|
+---+----------+----------+----------+-----------+
示例: 我们看第一行。取日期“2009-01-01”,想看 在 ID 是行 ID 的所有行中(此处为 A)并计数 日期“2009-01-01”在开始和结束之间的行数(本例中的第 1 行和第 2 行为真)。
原始代码table:
table = spark.createDataFrame(
[
["A", '2008-01-02', '2010-01-01', '2009-01-01'],
["A", '2005-01-02', '2012-01-01', None],
["A", '2013-01-02', '2015-01-01', '2014-01-01'],
["B", '2002-01-02', '2019-01-01', '2003-01-01'],
["B", '2015-01-02', '2017-01-01', '2016-01-01']
],
("Id", "start", "end", "date")
)
此代码有效,但会创建一个 "product" 联接,不建议对大量数据使用此联接。
table2 = table.select(
F.col("id"),
F.col("start").alias("s"),
F.col("end").alias("e"),
)
table3 = table.join(
table2, on="id"
)
table3 = table3.withColumn(
"one",
F.when(
F.col("date").between(F.col("s"),F.col("e")),
1
).otherwise(0)
)
table3.groupBy(
"Id",
"start",
"end",
"date"
).agg(F.sum("one").alias("sum_of_rows")).show()
+---+----------+----------+----------+-----------+
| Id| start| end| date|sum_of_rows|
+---+----------+----------+----------+-----------+
| B|2002-01-02|2019-01-01|2003-01-01| 1|
| B|2015-01-02|2017-01-01|2016-01-01| 2|
| A|2008-01-02|2010-01-01|2009-01-01| 2|
| A|2013-01-02|2015-01-01|2014-01-01| 1|
| A|2005-01-02|2012-01-01| null| 0|
+---+----------+----------+----------+-----------+