获取时间戳数据的每周和每天的平均值
Getting weekly and daily averages of timestamp data
我目前在 Spark 数据框中有这样格式的数据:
Timestamp Number
......... ......
M-D-Y 3
M-D-Y 4900
时间戳数据绝不是统一或一致的(即,我可能有一个值出现在 2015 年 3 月 1 日,而 table 中的下一个值是 9 月 1 日的日期, 2015 年……另外,每个日期我可以有多个条目)。
所以我想做两件事
- 计算每周的条目数。所以我基本上想要一个新的 table 来表示时间戳列在该行对应的一周中的行数。如果存在多年,理想情况下我希望对每年的值进行平均以获得单个值。
- 平均每周的数字列。因此,对于一年中的每一周,我都会有一个值代表数字列的平均值(如果该周内没有条目则为 0)。
通过结合 unix_timestamp
和简单的类型转换,使用内置函数解析日期相对容易:
sqlContext.sql(
"SELECT CAST(UNIX_TIMESTAMP('March 1, 2015', 'MMM d, yyyy') AS TIMESTAMP)"
).show(false)
// +---------------------+
// |_c0 |
// +---------------------+
// |2015-03-01 00:00:00.0|
// +---------------------+
使用 DataFrame
DSL 等效代码将是这样的:
import org.apache.spark.sql.functions.unix_timestamp
unix_timestamp($"date", "MMM d, yyyy").cast("timestamp")
要填补缺失的条目,您可以使用不同的技巧。最简单的方法是使用与上述相同的解析逻辑。首先让我们创建一些助手:
def leap(year: Int) = {
((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)
}
def weeksForYear(year: Int) = (1 to 52).map(w => s"$year $w")
def daysForYear(year: Int) = (1 to { if(leap(2000)) 366 else 366 }).map(
d => s"$year $d"
)
和示例参考数据(这里有几周,但你可以几天做同样的事情):
import org.apache.spark.sql.functions.{year, weekofyear}'
val exprs = Seq(year($"date").alias("year"), weekofyear($"date").alias("week"))
val weeks2015 = Seq(2015)
.flatMap(weeksForYear _)
.map(Tuple1.apply)
.toDF("date")
.withColumn("date", unix_timestamp($"date", "yyyy w").cast("timestamp"))
.select(exprs: _*)
终于可以转换原始数据了:
val df = Seq(
("March 1, 2015", 3), ("September 1, 2015", 4900)).toDF("Timestamp", "Number")
val dfParsed = df
.withColumn("date", unix_timestamp($"timestamp", "MMM d, yyyy").cast("timestamp"))
.select(exprs :+ $"Number": _*)
合并和聚合:
weeks2015.join(dfParsed, Seq("year", "week"), "left")
.groupBy($"year", $"week")
.agg(count($"Number"), avg($"Number"))
.na.fill(0)
我目前在 Spark 数据框中有这样格式的数据:
Timestamp Number
......... ......
M-D-Y 3
M-D-Y 4900
时间戳数据绝不是统一或一致的(即,我可能有一个值出现在 2015 年 3 月 1 日,而 table 中的下一个值是 9 月 1 日的日期, 2015 年……另外,每个日期我可以有多个条目)。
所以我想做两件事
- 计算每周的条目数。所以我基本上想要一个新的 table 来表示时间戳列在该行对应的一周中的行数。如果存在多年,理想情况下我希望对每年的值进行平均以获得单个值。
- 平均每周的数字列。因此,对于一年中的每一周,我都会有一个值代表数字列的平均值(如果该周内没有条目则为 0)。
通过结合 unix_timestamp
和简单的类型转换,使用内置函数解析日期相对容易:
sqlContext.sql(
"SELECT CAST(UNIX_TIMESTAMP('March 1, 2015', 'MMM d, yyyy') AS TIMESTAMP)"
).show(false)
// +---------------------+
// |_c0 |
// +---------------------+
// |2015-03-01 00:00:00.0|
// +---------------------+
使用 DataFrame
DSL 等效代码将是这样的:
import org.apache.spark.sql.functions.unix_timestamp
unix_timestamp($"date", "MMM d, yyyy").cast("timestamp")
要填补缺失的条目,您可以使用不同的技巧。最简单的方法是使用与上述相同的解析逻辑。首先让我们创建一些助手:
def leap(year: Int) = {
((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)
}
def weeksForYear(year: Int) = (1 to 52).map(w => s"$year $w")
def daysForYear(year: Int) = (1 to { if(leap(2000)) 366 else 366 }).map(
d => s"$year $d"
)
和示例参考数据(这里有几周,但你可以几天做同样的事情):
import org.apache.spark.sql.functions.{year, weekofyear}'
val exprs = Seq(year($"date").alias("year"), weekofyear($"date").alias("week"))
val weeks2015 = Seq(2015)
.flatMap(weeksForYear _)
.map(Tuple1.apply)
.toDF("date")
.withColumn("date", unix_timestamp($"date", "yyyy w").cast("timestamp"))
.select(exprs: _*)
终于可以转换原始数据了:
val df = Seq(
("March 1, 2015", 3), ("September 1, 2015", 4900)).toDF("Timestamp", "Number")
val dfParsed = df
.withColumn("date", unix_timestamp($"timestamp", "MMM d, yyyy").cast("timestamp"))
.select(exprs :+ $"Number": _*)
合并和聚合:
weeks2015.join(dfParsed, Seq("year", "week"), "left")
.groupBy($"year", $"week")
.agg(count($"Number"), avg($"Number"))
.na.fill(0)