使用数据块对过去 7 天的每一天的不同用户进行计数

Counter distinct user for each days for the last 7 days with databricks

我想统计过去 7 天每一天的不同用户数。

我有下面一行代码:

days = lambda i: i * 86400

df1 = sc.parallelize(
  [
    ['2021-05-01','aaa'], 
    ['2021-05-08','aaa'], 
    ['2021-05-15','aaa'], 
    ['2021-05-15','bbb'], 
    ['2021-06-01','aaa'], 
    ['2021-06-10','bbb'],
    ['2021-06-25','aaa'], 
    ['2021-06-30','aaa'], 
    ['2021-07-01','bbb'], 
    ['2021-07-10','aaa'],
    ['2021-07-14','bbb'], 
    ['2021-07-15','bbb'], 
    ['2021-07-25','bbb'], 
    ['2021-07-30','bbb'],
  ]).toDF(("date", "userId"))

df1.printSchema()

df1 = df1.withColumn("date", f.to_date("date", "yyyy-MM-dd"))

df1 = (df1
       .select('date', 'userId')
       .withColumn('7_days_active_users', f.approx_count_distinct('userId').over(Window.orderBy(f.col('date').cast('long')).rangeBetween(-days(7), 0)))
      )

df1.show()

这是输出:

+----------+------+-------------------+
|      date|userId|7_days_active_users|
+----------+------+-------------------+
|2021-05-01|   aaa|                  2|
|2021-05-08|   aaa|                  2|
|2021-05-15|   aaa|                  2|
|2021-05-15|   bbb|                  2|
|2021-06-01|   aaa|                  2|
|2021-06-10|   bbb|                  2|
|2021-06-25|   aaa|                  2|
|2021-06-30|   aaa|                  2|
|2021-07-01|   bbb|                  2|
|2021-07-10|   aaa|                  2|
|2021-07-14|   bbb|                  2|
|2021-07-15|   bbb|                  2|
|2021-07-25|   bbb|                  2|
|2021-07-30|   bbb|                  2|

对于 2021-05-08 他们应该只有一个不同的用户。

这是怎么回事?

您似乎在尝试按 date 列的 unix 时间戳对 window 进行排序。如果是这样,则将 DateType 转换为 LongType 将导致 null,从而得到上面的结果。

尝试先使用 unix_timestamp 函数或 .cast('timestamp') 方法将列 date 转换为时间戳,然后再将其转换为 LongType。

w = (Window
     .orderBy(f.col('date').cast('timestamp').cast('long'))
     .rangeBetween(-days(7), 0))

df1 = (df1
       .select('date', 'userId')
       .withColumn('7_days_active_users', 
                   f.approx_count_distinct('userId').over(w))
       )
df1.show()

+----------+------+-------------------+
|      date|userId|7_days_active_users|
+----------+------+-------------------+
|2021-05-01|   aaa|                  1|
|2021-05-08|   aaa|                  1|
|2021-05-15|   aaa|                  2|
|2021-05-15|   bbb|                  2|
|2021-06-01|   aaa|                  1|
|2021-06-10|   bbb|                  1|
|2021-06-25|   aaa|                  1|
|2021-06-30|   aaa|                  1|
|2021-07-01|   bbb|                  2|
|2021-07-10|   aaa|                  1|
|2021-07-14|   bbb|                  2|
|2021-07-15|   bbb|                  2|
|2021-07-25|   bbb|                  1|
|2021-07-30|   bbb|                  1|
+----------+------+-------------------+