阿帕奇火花 |具体时间范围聚合
Apache Spark | specific time frame aggregation
我需要一种每周聚合数据集的方法。这是我的数据集
| date|organization_id|media_package_id|event_uuid |
+----------+---------------+----------------+-----------+
|2016-10-25| 1| 11| 76304d|
|2016-10-25| 1| 11| e6285b|
|2016-10-22| 2| 21| 16c04d|
|2016-10-22| 2| 21| 17804d|
|2016-10-22| 2| 21| 18904x|
|2016-10-21| 2| 21| 51564q|
|2016-10-07| 4| 98| 12874t|
|2016-10-05| 4| 98| 11234d|
+----------+---------------+----------------+-----------+
让我们假设 Spark 作业每天 运行 以获得所需的聚合结果。我想要以一周为基础的结果,例如聚合后的上述数据集。
| date|organization_id|media_package_id| count|
+----------+---------------+----------------+-----------+
|2016-10-24| 1| 11| 2|
|2016-10-17| 2| 21| 4|
|2016-10-03| 4| 98| 2|
+----------+---------------+----------------+-----------+
在这里,如果您看到日期列,它是一周的第一天(我认为这是最好的方式)
我以某种方式设法每天进行汇总。这是我的做法
val data = MongoSupport.load(spark, "sampleCollection")
val dataForDates = data.filter(dataForDates("date").isin(dates : _*))
val countByDate = proofEventsForDates.groupBy("DATE", "ORGANIZATION_ID", "MEDIA_PACKAGE_ID")
.agg(count("EVENT_UUID").as("COUNT"))
val finalResult = impressionsByDate
.select(
col("DATE").as("date"),
col("ORGANIZATION_ID").as("organization_id"),
col("MEDIA_PACKAGE_ID").as("media_package_id"),
col("COUNT").as("count")
)
在这里,在开始过滤数据集时,我传递了一个特殊的 dates
列表,其中包含至少一个月左右的日期。我得到的结果是(这不是我想要的)
| date|organization_id|media_package_id| count|
+----------+---------------+----------------+-----------+
|2016-10-25| 1| 11| 2|
|2016-10-22| 2| 21| 3|
|2016-10-21| 2| 21| 1|
|2016-10-07| 2| 21| 1|
|2016-10-05| 2| 21| 1|
+----------+---------------+----------------+-----------+
从这里开始,我不知道如何每周汇总此数据集。
假设您的 date
列已经是 class date
,您可以使用函数 year()
和 weekofyear()
提取缺少的分组列聚合。
import org.apache.spark.sql.functions.weekofyear
import org.apache.spark.sql.functions.year
(df
.withColumn("week_nr", weekofyear($"date"))
.withColumn("year", year($"date"))
.groupBy("year",
"week_nr",
"organization_id",
"media_package_id")
.count().orderBy(desc("week_nr"))).show
+----+-------+---------------+----------------+-----+
|year|week_nr|organization_id|media_package_id|count|
+----+-------+---------------+----------------+-----+
|2016| 43| 1| 11| 2|
|2016| 42| 2| 21| 4|
|2016| 40| 4| 98| 2|
+----+-------+---------------+----------------+-----+
我需要一种每周聚合数据集的方法。这是我的数据集
| date|organization_id|media_package_id|event_uuid |
+----------+---------------+----------------+-----------+
|2016-10-25| 1| 11| 76304d|
|2016-10-25| 1| 11| e6285b|
|2016-10-22| 2| 21| 16c04d|
|2016-10-22| 2| 21| 17804d|
|2016-10-22| 2| 21| 18904x|
|2016-10-21| 2| 21| 51564q|
|2016-10-07| 4| 98| 12874t|
|2016-10-05| 4| 98| 11234d|
+----------+---------------+----------------+-----------+
让我们假设 Spark 作业每天 运行 以获得所需的聚合结果。我想要以一周为基础的结果,例如聚合后的上述数据集。
| date|organization_id|media_package_id| count|
+----------+---------------+----------------+-----------+
|2016-10-24| 1| 11| 2|
|2016-10-17| 2| 21| 4|
|2016-10-03| 4| 98| 2|
+----------+---------------+----------------+-----------+
在这里,如果您看到日期列,它是一周的第一天(我认为这是最好的方式)
我以某种方式设法每天进行汇总。这是我的做法
val data = MongoSupport.load(spark, "sampleCollection")
val dataForDates = data.filter(dataForDates("date").isin(dates : _*))
val countByDate = proofEventsForDates.groupBy("DATE", "ORGANIZATION_ID", "MEDIA_PACKAGE_ID")
.agg(count("EVENT_UUID").as("COUNT"))
val finalResult = impressionsByDate
.select(
col("DATE").as("date"),
col("ORGANIZATION_ID").as("organization_id"),
col("MEDIA_PACKAGE_ID").as("media_package_id"),
col("COUNT").as("count")
)
在这里,在开始过滤数据集时,我传递了一个特殊的 dates
列表,其中包含至少一个月左右的日期。我得到的结果是(这不是我想要的)
| date|organization_id|media_package_id| count|
+----------+---------------+----------------+-----------+
|2016-10-25| 1| 11| 2|
|2016-10-22| 2| 21| 3|
|2016-10-21| 2| 21| 1|
|2016-10-07| 2| 21| 1|
|2016-10-05| 2| 21| 1|
+----------+---------------+----------------+-----------+
从这里开始,我不知道如何每周汇总此数据集。
假设您的 date
列已经是 class date
,您可以使用函数 year()
和 weekofyear()
提取缺少的分组列聚合。
import org.apache.spark.sql.functions.weekofyear
import org.apache.spark.sql.functions.year
(df
.withColumn("week_nr", weekofyear($"date"))
.withColumn("year", year($"date"))
.groupBy("year",
"week_nr",
"organization_id",
"media_package_id")
.count().orderBy(desc("week_nr"))).show
+----+-------+---------------+----------------+-----+
|year|week_nr|organization_id|media_package_id|count|
+----+-------+---------------+----------------+-----+
|2016| 43| 1| 11| 2|
|2016| 42| 2| 21| 4|
|2016| 40| 4| 98| 2|
+----+-------+---------------+----------------+-----+