SparkSQL:日期范围的条件总和
SparkSQL: conditional sum on range of dates
我有一个这样的数据框:
| id | prodId | date | value |
| 1 | a | 2015-01-01 | 100 |
| 2 | a | 2015-01-02 | 150 |
| 3 | a | 2015-01-03 | 120 |
| 4 | b | 2015-01-01 | 100 |
而且我很想做一个 groupBy prodId 并汇总 'value' 对日期范围进行总结。换句话说,我需要构建一个包含以下列的 table:
- 产品编号
- val_1:如果日期在 date1 和 date2 之间,则求和值
- val_2:如果日期在 date2 和 date3 之间,则求和值
val_3:同前
等等
| prodId | val_1 | val_2 |
| | (01-01 to 01-02) | (01-03 to 01-04) |
| a | 250 | 120 |
| b | 100 | 0 |
spark 中是否有允许进行条件求和的预定义聚合函数?你建议开发一个aggr。 UDF(如果是这样,有什么建议)?
非常感谢!
首先让我们重新创建示例数据集
import org.apache.spark.sql.functions.to_date
val df = sc.parallelize(Seq(
(1, "a", "2015-01-01", 100), (2, "a", "2015-01-02", 150),
(3, "a", "2015-01-03", 120), (4, "b", "2015-01-01", 100)
)).toDF("id", "prodId", "date", "value").withColumn("date", to_date($"date"))
val dates = List(("2015-01-01", "2015-01-02"), ("2015-01-03", "2015-01-04"))
你所要做的就是这样:
import org.apache.spark.sql.functions.{when, lit, sum}
val exprs = dates.map{
case (x, y) => {
// Create label for a column name
val alias = s"${x}_${y}".replace("-", "_")
// Convert strings to dates
val xd = to_date(lit(x))
val yd = to_date(lit(y))
// Generate expression equivalent to
// SUM(
// CASE
// WHEN date BETWEEN ... AND ... THEN value
// ELSE 0
// END
// ) AS ...
// for each pair of dates.
sum(when($"date".between(xd, yd), $"value").otherwise(0)).alias(alias)
}
}
df.groupBy($"prodId").agg(exprs.head, exprs.tail: _*).show
// +------+---------------------+---------------------+
// |prodId|2015_01_01_2015_01_02|2015_01_03_2015_01_04|
// +------+---------------------+---------------------+
// | a| 250| 120|
// | b| 100| 0|
// +------+---------------------+---------------------+
我有一个这样的数据框:
| id | prodId | date | value |
| 1 | a | 2015-01-01 | 100 |
| 2 | a | 2015-01-02 | 150 |
| 3 | a | 2015-01-03 | 120 |
| 4 | b | 2015-01-01 | 100 |
而且我很想做一个 groupBy prodId 并汇总 'value' 对日期范围进行总结。换句话说,我需要构建一个包含以下列的 table:
- 产品编号
- val_1:如果日期在 date1 和 date2 之间,则求和值
- val_2:如果日期在 date2 和 date3 之间,则求和值
val_3:同前 等等
| prodId | val_1 | val_2 | | | (01-01 to 01-02) | (01-03 to 01-04) | | a | 250 | 120 | | b | 100 | 0 |
spark 中是否有允许进行条件求和的预定义聚合函数?你建议开发一个aggr。 UDF(如果是这样,有什么建议)? 非常感谢!
首先让我们重新创建示例数据集
import org.apache.spark.sql.functions.to_date
val df = sc.parallelize(Seq(
(1, "a", "2015-01-01", 100), (2, "a", "2015-01-02", 150),
(3, "a", "2015-01-03", 120), (4, "b", "2015-01-01", 100)
)).toDF("id", "prodId", "date", "value").withColumn("date", to_date($"date"))
val dates = List(("2015-01-01", "2015-01-02"), ("2015-01-03", "2015-01-04"))
你所要做的就是这样:
import org.apache.spark.sql.functions.{when, lit, sum}
val exprs = dates.map{
case (x, y) => {
// Create label for a column name
val alias = s"${x}_${y}".replace("-", "_")
// Convert strings to dates
val xd = to_date(lit(x))
val yd = to_date(lit(y))
// Generate expression equivalent to
// SUM(
// CASE
// WHEN date BETWEEN ... AND ... THEN value
// ELSE 0
// END
// ) AS ...
// for each pair of dates.
sum(when($"date".between(xd, yd), $"value").otherwise(0)).alias(alias)
}
}
df.groupBy($"prodId").agg(exprs.head, exprs.tail: _*).show
// +------+---------------------+---------------------+
// |prodId|2015_01_01_2015_01_02|2015_01_03_2015_01_04|
// +------+---------------------+---------------------+
// | a| 250| 120|
// | b| 100| 0|
// +------+---------------------+---------------------+