按时间间隔计算并发用户会话

Counting conccurent user sessions by time intervals

我的数据由带有开始和结束时间戳的会话组成。我的任务是按公司和应用程序版本计算每个时间间隔 'active' 的会话数。我从 30 分钟的间隔开始。因此,如果一家公司的会话从 2:10pm 持续到 3:35pm... 该公司将计入 4 bins/intervals(2:00,2:30,3:00, 3:30).我将如何在 spark/scala 中解决这个问题?

最终,我需要它来扩展每天数百万个会话。

这是我的示例数据:

val df = sc.parallelize(List( ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"), ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"), ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"), ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"), ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"), ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"), ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"), ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"), ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"), ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"), ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"), ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"), ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000")  )).toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp"))
display(df)

我希望我的结果如下所示。这些计数间隔为 30 分钟,但最终我将以低至一或两分钟的间隔计数。

timeinterval           customer  xi1 xi2
2020-07-01 22:30:00  Company A   1   1
2020-07-01 22:30:00  Company B   0   1
2020-07-01 23:00:00  Company A   1   2
2020-07-01 23:00:00  Company B   1   1
2020-07-01 23:30:00  Company A   1   4
2020-07-01 23:30:00  Company B   2   1
2020-07-02 00:00:00  Company A   1   4
2020-07-02 00:00:00  Company B   3   1
2020-07-02 00:30:00  Company A   2   4
2020-07-02 00:30:00  Company B   3   1
2020-07-02 01:00:00  Company A   2   3
2020-07-02 01:00:00  Company B   2   2
2020-07-02 01:30:00  Company A   2   1
2020-07-02 01:30:00  Company B   2   2
2020-07-02 02:00:00  Company A   0   1
2020-07-02 02:00:00  Company B   1   2
2020-07-02 02:30:00  Company B   1   1

任何关于最佳方法的帮助或想法将不胜感激。

也许这有帮助-

加载提供的测试数据

val df = spark.sparkContext.parallelize(List(
      ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"),
      ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"),
      ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"),
      ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"),
      ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"),
      ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"),
      ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"),
      ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"),
      ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"),
      ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"),
      ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"),
      ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"),
      ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"),
      ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"),
      ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000")  ))
      .toDF("customer","device_model","start_timestamp","end_timestamp")
      .withColumn("start_timestamp", to_timestamp($"start_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
      .withColumn("end_timestamp", to_timestamp($"end_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
    df.show(false)
    /**
      * +---------+------------+-------------------+-------------------+
      * |customer |device_model|start_timestamp    |end_timestamp      |
      * +---------+------------+-------------------+-------------------+
      * |Company B|xi2         |2020-07-02 06:37:00|2020-07-02 07:59:00|
      * |Company A|xi1         |2020-07-02 05:25:00|2020-07-02 06:47:00|
      * |Company B|xi2         |2020-07-02 04:01:00|2020-07-02 05:23:00|
      * |Company B|xi1         |2020-07-02 04:37:00|2020-07-02 05:59:00|
      * |Company A|xi1         |2020-07-02 03:49:00|2020-07-02 05:11:00|
      * |Company B|xi1         |2020-07-02 05:37:00|2020-07-02 06:59:00|
      * |Company B|xi1         |2020-07-02 06:25:00|2020-07-02 07:47:00|
      * |Company A|xi1         |2020-07-02 05:49:00|2020-07-02 07:11:00|
      * |Company A|xi2         |2020-07-02 04:25:00|2020-07-02 05:47:00|
      * |Company B|xi2         |2020-07-02 06:13:00|2020-07-02 07:35:00|
      * |Company A|xi2         |2020-07-02 05:01:00|2020-07-02 06:23:00|
      * |Company B|xi1         |2020-07-02 04:49:00|2020-07-02 06:11:00|
      * |Company A|xi2         |2020-07-02 05:13:00|2020-07-02 06:35:00|
      * |Company A|xi2         |2020-07-02 06:01:00|2020-07-02 07:23:00|
      * |Company A|xi2         |2020-07-02 04:13:00|2020-07-02 05:35:00|
      * +---------+------------+-------------------+-------------------+
      */

sequence + interval生成bins/intervals

change intervalInMinutes as required

    val intervalInMinutes = 30
    val seconds = intervalInMinutes * 60 // seconds
    val p = df.withColumn("new_start", to_timestamp(floor($"start_timestamp".cast("long")/ seconds ) * seconds))
      .withColumn("splits", sequence(
      $"new_start",
      $"end_timestamp",
      expr(s"interval $intervalInMinutes MINUTE")))
    p.show(false)

    /**
      * +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
      * |customer |device_model|start_timestamp    |end_timestamp      |new_start          |splits                                                                              |
      * +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
      * |Company B|xi2         |2020-07-02 06:37:00|2020-07-02 07:59:00|2020-07-02 06:30:00|[2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]                     |
      * |Company A|xi1         |2020-07-02 05:25:00|2020-07-02 06:47:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
      * |Company B|xi2         |2020-07-02 04:01:00|2020-07-02 05:23:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00]                     |
      * |Company B|xi1         |2020-07-02 04:37:00|2020-07-02 05:59:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]                     |
      * |Company A|xi1         |2020-07-02 03:49:00|2020-07-02 05:11:00|2020-07-02 03:30:00|[2020-07-02 03:30:00, 2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00]|
      * |Company B|xi1         |2020-07-02 05:37:00|2020-07-02 06:59:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]                     |
      * |Company B|xi1         |2020-07-02 06:25:00|2020-07-02 07:47:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
      * |Company A|xi1         |2020-07-02 05:49:00|2020-07-02 07:11:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00]|
      * |Company A|xi2         |2020-07-02 04:25:00|2020-07-02 05:47:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
      * |Company B|xi2         |2020-07-02 06:13:00|2020-07-02 07:35:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
      * |Company A|xi2         |2020-07-02 05:01:00|2020-07-02 06:23:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00]                     |
      * |Company B|xi1         |2020-07-02 04:49:00|2020-07-02 06:11:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00]|
      * |Company A|xi2         |2020-07-02 05:13:00|2020-07-02 06:35:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
      * |Company A|xi2         |2020-07-02 06:01:00|2020-07-02 07:23:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00]                     |
      * |Company A|xi2         |2020-07-02 04:13:00|2020-07-02 05:35:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
      * +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
      */

pivot+count 获取每个间隔的计数


    p.select($"customer", $"device_model", explode($"splits").as("timeinterval"))
      .groupBy("timeinterval", "customer")
      .pivot("device_model")
      .agg(
        count("device_model")
      )
      .withColumn("xi1", coalesce($"xi1", lit(0)))
      .withColumn("xi2", coalesce($"xi2", lit(0)))
      .orderBy("timeinterval", "customer")
      .show(false)

    /**
      * +-------------------+---------+---+---+
      * |timeinterval       |customer |xi1|xi2|
      * +-------------------+---------+---+---+
      * |2020-07-02 03:30:00|Company A|1  |0  |
      * |2020-07-02 04:00:00|Company A|1  |2  |
      * |2020-07-02 04:00:00|Company B|0  |1  |
      * |2020-07-02 04:30:00|Company A|1  |2  |
      * |2020-07-02 04:30:00|Company B|2  |1  |
      * |2020-07-02 05:00:00|Company A|2  |4  |
      * |2020-07-02 05:00:00|Company B|2  |1  |
      * |2020-07-02 05:30:00|Company A|2  |4  |
      * |2020-07-02 05:30:00|Company B|3  |0  |
      * |2020-07-02 06:00:00|Company A|2  |3  |
      * |2020-07-02 06:00:00|Company B|3  |1  |
      * |2020-07-02 06:30:00|Company A|2  |2  |
      * |2020-07-02 06:30:00|Company B|2  |2  |
      * |2020-07-02 07:00:00|Company A|1  |1  |
      * |2020-07-02 07:00:00|Company B|1  |2  |
      * |2020-07-02 07:30:00|Company B|1  |2  |
      * +-------------------+---------+---+---+
      */