Spark:根据两列计算事件

Spark: count events based on two columns

我有一个 table,其中包含按 uid 分组的事件。所有行都有 uidvisit_numevent_num.

visit_num 是一个偶尔增加的任意计数器。 event_num 是访问中交互的计数器。

我想将这两个计数器合并为一个交互计数器,该计数器每次事件都增加 1,并在下次访问开始时继续增加。

因为我只看事件之间的相对距离,所以不从1开始计数也没关系。

|uid |visit_num|event_num|interaction_num| | 1 | 1 | 1 | 1 | | 1 | 1 | 2 | 2 | | 1 | 2 | 1 | 3 | | 1 | 2 | 2 | 4 | | 2 | 1 | 1 | 500 | | 2 | 2 | 1 | 501 | | 2 | 2 | 2 | 502 |

我可以通过重新分区数据并使用 monotonically_increasing_id 来实现这一点:

df.repartition("uid")\
  .sort("visit_num", "event_num")\
  .withColumn("iid", fn.monotonically_increasing_id())

但是文档指出:

生成的ID保证单调递增且唯一,但不连续。当前的实现将分区 ID 放在高 31 位,将每个分区内的记录号放在低 33 位。假设数据框少于10亿个分区,每个分区少于80亿条记录。

因为 id 似乎是按分区单调递增的,所以这似乎没问题。然而:

有什么方法可以让每个 uid 以 1 作为第一个交互编号开始?

编辑

进一步测试后,我注意到有些用户似乎没有使用上述方法获得连续的 iid 值。

编辑 2:窗口化

不幸的是,在某些(罕见的)情况下,超过one row has the samevisit_numandevent_num`。我试过如下使用窗口函数,但由于将相同的等级分配给两个相同的列,这不是一个真正的选择。

iid_window = Window.partitionBy("uid").orderBy("visit_num", "event_num")
df_sample_iid=df_sample.withColumn("iid", fn.rank().over(iid_window))    

最好的解决方案是 Jacek Laskowski 所建议的带等级的窗口函数。

iid_window = Window.partitionBy("uid").orderBy("visit_num", "event_num")
df_sample_iid=df_sample.withColumn("iid", fn.rank().over(iid_window))

在我的具体情况下,需要进行更多的数据清理,但一般来说,这应该可行。