Spark:根据两列计算事件
Spark: count events based on two columns
我有一个 table,其中包含按 uid 分组的事件。所有行都有 uid
、visit_num
和 event_num
.
列
visit_num
是一个偶尔增加的任意计数器。 event_num
是访问中交互的计数器。
我想将这两个计数器合并为一个交互计数器,该计数器每次事件都增加 1,并在下次访问开始时继续增加。
因为我只看事件之间的相对距离,所以不从1开始计数也没关系。
|uid |visit_num|event_num|interaction_num|
| 1 | 1 | 1 | 1 |
| 1 | 1 | 2 | 2 |
| 1 | 2 | 1 | 3 |
| 1 | 2 | 2 | 4 |
| 2 | 1 | 1 | 500 |
| 2 | 2 | 1 | 501 |
| 2 | 2 | 2 | 502 |
我可以通过重新分区数据并使用 monotonically_increasing_id
来实现这一点:
df.repartition("uid")\
.sort("visit_num", "event_num")\
.withColumn("iid", fn.monotonically_increasing_id())
但是文档指出:
生成的ID保证单调递增且唯一,但不连续。当前的实现将分区 ID 放在高 31 位,将每个分区内的记录号放在低 33 位。假设数据框少于10亿个分区,每个分区少于80亿条记录。
因为 id 似乎是按分区单调递增的,所以这似乎没问题。然而:
- 我即将达到 10 亿 partition/uid 门槛。
- 我不想依赖当前实施不变。
有什么方法可以让每个 uid 以 1 作为第一个交互编号开始?
编辑
进一步测试后,我注意到有些用户似乎没有使用上述方法获得连续的 iid
值。
编辑 2:窗口化
不幸的是,在某些(罕见的)情况下,超过one row has the same
visit_numand
event_num`。我试过如下使用窗口函数,但由于将相同的等级分配给两个相同的列,这不是一个真正的选择。
iid_window = Window.partitionBy("uid").orderBy("visit_num", "event_num")
df_sample_iid=df_sample.withColumn("iid", fn.rank().over(iid_window))
最好的解决方案是 Jacek Laskowski 所建议的带等级的窗口函数。
iid_window = Window.partitionBy("uid").orderBy("visit_num", "event_num")
df_sample_iid=df_sample.withColumn("iid", fn.rank().over(iid_window))
在我的具体情况下,需要进行更多的数据清理,但一般来说,这应该可行。
我有一个 table,其中包含按 uid 分组的事件。所有行都有 uid
、visit_num
和 event_num
.
visit_num
是一个偶尔增加的任意计数器。 event_num
是访问中交互的计数器。
我想将这两个计数器合并为一个交互计数器,该计数器每次事件都增加 1,并在下次访问开始时继续增加。
因为我只看事件之间的相对距离,所以不从1开始计数也没关系。
|uid |visit_num|event_num|interaction_num|
| 1 | 1 | 1 | 1 |
| 1 | 1 | 2 | 2 |
| 1 | 2 | 1 | 3 |
| 1 | 2 | 2 | 4 |
| 2 | 1 | 1 | 500 |
| 2 | 2 | 1 | 501 |
| 2 | 2 | 2 | 502 |
我可以通过重新分区数据并使用 monotonically_increasing_id
来实现这一点:
df.repartition("uid")\
.sort("visit_num", "event_num")\
.withColumn("iid", fn.monotonically_increasing_id())
但是文档指出:
生成的ID保证单调递增且唯一,但不连续。当前的实现将分区 ID 放在高 31 位,将每个分区内的记录号放在低 33 位。假设数据框少于10亿个分区,每个分区少于80亿条记录。
因为 id 似乎是按分区单调递增的,所以这似乎没问题。然而:
- 我即将达到 10 亿 partition/uid 门槛。
- 我不想依赖当前实施不变。
有什么方法可以让每个 uid 以 1 作为第一个交互编号开始?
编辑
进一步测试后,我注意到有些用户似乎没有使用上述方法获得连续的 iid
值。
编辑 2:窗口化
不幸的是,在某些(罕见的)情况下,超过one row has the same
visit_numand
event_num`。我试过如下使用窗口函数,但由于将相同的等级分配给两个相同的列,这不是一个真正的选择。
iid_window = Window.partitionBy("uid").orderBy("visit_num", "event_num")
df_sample_iid=df_sample.withColumn("iid", fn.rank().over(iid_window))
最好的解决方案是 Jacek Laskowski 所建议的带等级的窗口函数。
iid_window = Window.partitionBy("uid").orderBy("visit_num", "event_num")
df_sample_iid=df_sample.withColumn("iid", fn.rank().over(iid_window))
在我的具体情况下,需要进行更多的数据清理,但一般来说,这应该可行。