按时间间隔计算并发用户会话
Counting conccurent user sessions by time intervals
我的数据由带有开始和结束时间戳的会话组成。我的任务是按公司和应用程序版本计算每个时间间隔 'active' 的会话数。我从 30 分钟的间隔开始。因此,如果一家公司的会话从 2:10pm 持续到 3:35pm... 该公司将计入 4 bins/intervals(2:00,2:30,3:00, 3:30).我将如何在 spark/scala 中解决这个问题?
最终,我需要它来扩展每天数百万个会话。
这是我的示例数据:
val df = sc.parallelize(List( ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"), ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"), ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"), ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"), ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"), ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"), ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"), ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"), ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"), ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"), ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"), ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"), ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000") )).toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp"))
display(df)
我希望我的结果如下所示。这些计数间隔为 30 分钟,但最终我将以低至一或两分钟的间隔计数。
timeinterval customer xi1 xi2
2020-07-01 22:30:00 Company A 1 1
2020-07-01 22:30:00 Company B 0 1
2020-07-01 23:00:00 Company A 1 2
2020-07-01 23:00:00 Company B 1 1
2020-07-01 23:30:00 Company A 1 4
2020-07-01 23:30:00 Company B 2 1
2020-07-02 00:00:00 Company A 1 4
2020-07-02 00:00:00 Company B 3 1
2020-07-02 00:30:00 Company A 2 4
2020-07-02 00:30:00 Company B 3 1
2020-07-02 01:00:00 Company A 2 3
2020-07-02 01:00:00 Company B 2 2
2020-07-02 01:30:00 Company A 2 1
2020-07-02 01:30:00 Company B 2 2
2020-07-02 02:00:00 Company A 0 1
2020-07-02 02:00:00 Company B 1 2
2020-07-02 02:30:00 Company B 1 1
任何关于最佳方法的帮助或想法将不胜感激。
也许这有帮助-
加载提供的测试数据
val df = spark.sparkContext.parallelize(List(
("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"),
("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"),
("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"),
("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"),
("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"),
("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"),
("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"),
("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"),
("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"),
("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"),
("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"),
("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"),
("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"),
("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"),
("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000") ))
.toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
df.show(false)
/**
* +---------+------------+-------------------+-------------------+
* |customer |device_model|start_timestamp |end_timestamp |
* +---------+------------+-------------------+-------------------+
* |Company B|xi2 |2020-07-02 06:37:00|2020-07-02 07:59:00|
* |Company A|xi1 |2020-07-02 05:25:00|2020-07-02 06:47:00|
* |Company B|xi2 |2020-07-02 04:01:00|2020-07-02 05:23:00|
* |Company B|xi1 |2020-07-02 04:37:00|2020-07-02 05:59:00|
* |Company A|xi1 |2020-07-02 03:49:00|2020-07-02 05:11:00|
* |Company B|xi1 |2020-07-02 05:37:00|2020-07-02 06:59:00|
* |Company B|xi1 |2020-07-02 06:25:00|2020-07-02 07:47:00|
* |Company A|xi1 |2020-07-02 05:49:00|2020-07-02 07:11:00|
* |Company A|xi2 |2020-07-02 04:25:00|2020-07-02 05:47:00|
* |Company B|xi2 |2020-07-02 06:13:00|2020-07-02 07:35:00|
* |Company A|xi2 |2020-07-02 05:01:00|2020-07-02 06:23:00|
* |Company B|xi1 |2020-07-02 04:49:00|2020-07-02 06:11:00|
* |Company A|xi2 |2020-07-02 05:13:00|2020-07-02 06:35:00|
* |Company A|xi2 |2020-07-02 06:01:00|2020-07-02 07:23:00|
* |Company A|xi2 |2020-07-02 04:13:00|2020-07-02 05:35:00|
* +---------+------------+-------------------+-------------------+
*/
sequence + interval
生成bins/intervals
change intervalInMinutes as required
val intervalInMinutes = 30
val seconds = intervalInMinutes * 60 // seconds
val p = df.withColumn("new_start", to_timestamp(floor($"start_timestamp".cast("long")/ seconds ) * seconds))
.withColumn("splits", sequence(
$"new_start",
$"end_timestamp",
expr(s"interval $intervalInMinutes MINUTE")))
p.show(false)
/**
* +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
* |customer |device_model|start_timestamp |end_timestamp |new_start |splits |
* +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
* |Company B|xi2 |2020-07-02 06:37:00|2020-07-02 07:59:00|2020-07-02 06:30:00|[2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00] |
* |Company A|xi1 |2020-07-02 05:25:00|2020-07-02 06:47:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
* |Company B|xi2 |2020-07-02 04:01:00|2020-07-02 05:23:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00] |
* |Company B|xi1 |2020-07-02 04:37:00|2020-07-02 05:59:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00] |
* |Company A|xi1 |2020-07-02 03:49:00|2020-07-02 05:11:00|2020-07-02 03:30:00|[2020-07-02 03:30:00, 2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00]|
* |Company B|xi1 |2020-07-02 05:37:00|2020-07-02 06:59:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00] |
* |Company B|xi1 |2020-07-02 06:25:00|2020-07-02 07:47:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
* |Company A|xi1 |2020-07-02 05:49:00|2020-07-02 07:11:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00]|
* |Company A|xi2 |2020-07-02 04:25:00|2020-07-02 05:47:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
* |Company B|xi2 |2020-07-02 06:13:00|2020-07-02 07:35:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
* |Company A|xi2 |2020-07-02 05:01:00|2020-07-02 06:23:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00] |
* |Company B|xi1 |2020-07-02 04:49:00|2020-07-02 06:11:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00]|
* |Company A|xi2 |2020-07-02 05:13:00|2020-07-02 06:35:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
* |Company A|xi2 |2020-07-02 06:01:00|2020-07-02 07:23:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00] |
* |Company A|xi2 |2020-07-02 04:13:00|2020-07-02 05:35:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
* +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
*/
pivot+count
获取每个间隔的计数
p.select($"customer", $"device_model", explode($"splits").as("timeinterval"))
.groupBy("timeinterval", "customer")
.pivot("device_model")
.agg(
count("device_model")
)
.withColumn("xi1", coalesce($"xi1", lit(0)))
.withColumn("xi2", coalesce($"xi2", lit(0)))
.orderBy("timeinterval", "customer")
.show(false)
/**
* +-------------------+---------+---+---+
* |timeinterval |customer |xi1|xi2|
* +-------------------+---------+---+---+
* |2020-07-02 03:30:00|Company A|1 |0 |
* |2020-07-02 04:00:00|Company A|1 |2 |
* |2020-07-02 04:00:00|Company B|0 |1 |
* |2020-07-02 04:30:00|Company A|1 |2 |
* |2020-07-02 04:30:00|Company B|2 |1 |
* |2020-07-02 05:00:00|Company A|2 |4 |
* |2020-07-02 05:00:00|Company B|2 |1 |
* |2020-07-02 05:30:00|Company A|2 |4 |
* |2020-07-02 05:30:00|Company B|3 |0 |
* |2020-07-02 06:00:00|Company A|2 |3 |
* |2020-07-02 06:00:00|Company B|3 |1 |
* |2020-07-02 06:30:00|Company A|2 |2 |
* |2020-07-02 06:30:00|Company B|2 |2 |
* |2020-07-02 07:00:00|Company A|1 |1 |
* |2020-07-02 07:00:00|Company B|1 |2 |
* |2020-07-02 07:30:00|Company B|1 |2 |
* +-------------------+---------+---+---+
*/
我的数据由带有开始和结束时间戳的会话组成。我的任务是按公司和应用程序版本计算每个时间间隔 'active' 的会话数。我从 30 分钟的间隔开始。因此,如果一家公司的会话从 2:10pm 持续到 3:35pm... 该公司将计入 4 bins/intervals(2:00,2:30,3:00, 3:30).我将如何在 spark/scala 中解决这个问题?
最终,我需要它来扩展每天数百万个会话。
这是我的示例数据:
val df = sc.parallelize(List( ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"), ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"), ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"), ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"), ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"), ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"), ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"), ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"), ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"), ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"), ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"), ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"), ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000") )).toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp"))
display(df)
我希望我的结果如下所示。这些计数间隔为 30 分钟,但最终我将以低至一或两分钟的间隔计数。
timeinterval customer xi1 xi2
2020-07-01 22:30:00 Company A 1 1
2020-07-01 22:30:00 Company B 0 1
2020-07-01 23:00:00 Company A 1 2
2020-07-01 23:00:00 Company B 1 1
2020-07-01 23:30:00 Company A 1 4
2020-07-01 23:30:00 Company B 2 1
2020-07-02 00:00:00 Company A 1 4
2020-07-02 00:00:00 Company B 3 1
2020-07-02 00:30:00 Company A 2 4
2020-07-02 00:30:00 Company B 3 1
2020-07-02 01:00:00 Company A 2 3
2020-07-02 01:00:00 Company B 2 2
2020-07-02 01:30:00 Company A 2 1
2020-07-02 01:30:00 Company B 2 2
2020-07-02 02:00:00 Company A 0 1
2020-07-02 02:00:00 Company B 1 2
2020-07-02 02:30:00 Company B 1 1
任何关于最佳方法的帮助或想法将不胜感激。
也许这有帮助-
加载提供的测试数据
val df = spark.sparkContext.parallelize(List(
("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"),
("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"),
("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"),
("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"),
("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"),
("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"),
("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"),
("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"),
("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"),
("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"),
("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"),
("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"),
("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"),
("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"),
("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000") ))
.toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
df.show(false)
/**
* +---------+------------+-------------------+-------------------+
* |customer |device_model|start_timestamp |end_timestamp |
* +---------+------------+-------------------+-------------------+
* |Company B|xi2 |2020-07-02 06:37:00|2020-07-02 07:59:00|
* |Company A|xi1 |2020-07-02 05:25:00|2020-07-02 06:47:00|
* |Company B|xi2 |2020-07-02 04:01:00|2020-07-02 05:23:00|
* |Company B|xi1 |2020-07-02 04:37:00|2020-07-02 05:59:00|
* |Company A|xi1 |2020-07-02 03:49:00|2020-07-02 05:11:00|
* |Company B|xi1 |2020-07-02 05:37:00|2020-07-02 06:59:00|
* |Company B|xi1 |2020-07-02 06:25:00|2020-07-02 07:47:00|
* |Company A|xi1 |2020-07-02 05:49:00|2020-07-02 07:11:00|
* |Company A|xi2 |2020-07-02 04:25:00|2020-07-02 05:47:00|
* |Company B|xi2 |2020-07-02 06:13:00|2020-07-02 07:35:00|
* |Company A|xi2 |2020-07-02 05:01:00|2020-07-02 06:23:00|
* |Company B|xi1 |2020-07-02 04:49:00|2020-07-02 06:11:00|
* |Company A|xi2 |2020-07-02 05:13:00|2020-07-02 06:35:00|
* |Company A|xi2 |2020-07-02 06:01:00|2020-07-02 07:23:00|
* |Company A|xi2 |2020-07-02 04:13:00|2020-07-02 05:35:00|
* +---------+------------+-------------------+-------------------+
*/
sequence + interval
生成bins/intervals
change intervalInMinutes as required
val intervalInMinutes = 30
val seconds = intervalInMinutes * 60 // seconds
val p = df.withColumn("new_start", to_timestamp(floor($"start_timestamp".cast("long")/ seconds ) * seconds))
.withColumn("splits", sequence(
$"new_start",
$"end_timestamp",
expr(s"interval $intervalInMinutes MINUTE")))
p.show(false)
/**
* +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
* |customer |device_model|start_timestamp |end_timestamp |new_start |splits |
* +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
* |Company B|xi2 |2020-07-02 06:37:00|2020-07-02 07:59:00|2020-07-02 06:30:00|[2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00] |
* |Company A|xi1 |2020-07-02 05:25:00|2020-07-02 06:47:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
* |Company B|xi2 |2020-07-02 04:01:00|2020-07-02 05:23:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00] |
* |Company B|xi1 |2020-07-02 04:37:00|2020-07-02 05:59:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00] |
* |Company A|xi1 |2020-07-02 03:49:00|2020-07-02 05:11:00|2020-07-02 03:30:00|[2020-07-02 03:30:00, 2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00]|
* |Company B|xi1 |2020-07-02 05:37:00|2020-07-02 06:59:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00] |
* |Company B|xi1 |2020-07-02 06:25:00|2020-07-02 07:47:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
* |Company A|xi1 |2020-07-02 05:49:00|2020-07-02 07:11:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00]|
* |Company A|xi2 |2020-07-02 04:25:00|2020-07-02 05:47:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
* |Company B|xi2 |2020-07-02 06:13:00|2020-07-02 07:35:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
* |Company A|xi2 |2020-07-02 05:01:00|2020-07-02 06:23:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00] |
* |Company B|xi1 |2020-07-02 04:49:00|2020-07-02 06:11:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00]|
* |Company A|xi2 |2020-07-02 05:13:00|2020-07-02 06:35:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
* |Company A|xi2 |2020-07-02 06:01:00|2020-07-02 07:23:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00] |
* |Company A|xi2 |2020-07-02 04:13:00|2020-07-02 05:35:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
* +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
*/
pivot+count
获取每个间隔的计数
p.select($"customer", $"device_model", explode($"splits").as("timeinterval"))
.groupBy("timeinterval", "customer")
.pivot("device_model")
.agg(
count("device_model")
)
.withColumn("xi1", coalesce($"xi1", lit(0)))
.withColumn("xi2", coalesce($"xi2", lit(0)))
.orderBy("timeinterval", "customer")
.show(false)
/**
* +-------------------+---------+---+---+
* |timeinterval |customer |xi1|xi2|
* +-------------------+---------+---+---+
* |2020-07-02 03:30:00|Company A|1 |0 |
* |2020-07-02 04:00:00|Company A|1 |2 |
* |2020-07-02 04:00:00|Company B|0 |1 |
* |2020-07-02 04:30:00|Company A|1 |2 |
* |2020-07-02 04:30:00|Company B|2 |1 |
* |2020-07-02 05:00:00|Company A|2 |4 |
* |2020-07-02 05:00:00|Company B|2 |1 |
* |2020-07-02 05:30:00|Company A|2 |4 |
* |2020-07-02 05:30:00|Company B|3 |0 |
* |2020-07-02 06:00:00|Company A|2 |3 |
* |2020-07-02 06:00:00|Company B|3 |1 |
* |2020-07-02 06:30:00|Company A|2 |2 |
* |2020-07-02 06:30:00|Company B|2 |2 |
* |2020-07-02 07:00:00|Company A|1 |1 |
* |2020-07-02 07:00:00|Company B|1 |2 |
* |2020-07-02 07:30:00|Company B|1 |2 |
* +-------------------+---------+---+---+
*/