Pyspark 计数器字段,groupby 并递增 1
Pyspark counter field, groupby and increment by 1
我的数据框如下:
cola, colb
1, 2
1, 3
2, 1
2, 5
我想添加一列count
:
cola, colb, count
1, 2, 1
1, 3, 2
2, 1, 1
2, 5, 2
需要对 colA 进行分组并将第一个记录的计数设置为 1,然后将每个连续的行递增 1。
我尝试使用 window 函数,但它对 colA 中的所有记录应用相同的计数,但没有递增。
您已经发现 window 函数是正确的选择。可能你没用过rank功能
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [(1 , 2),
(1 , 3 ),
(1 , 2 ),
(2 , 1 ),
(2 , 5)]
columns = ['cola', 'colb']
df=spark.createDataFrame(l, columns)
w = Window.partitionBy('cola').orderBy('colb')
df = df.withColumn('count', F.rank().over(w))
df.show()
输出:
+----+----+-----+
|cola|colb|count|
+----+----+-----+
| 1| 2| 1|
| 1| 2| 1|
| 1| 3| 3|
| 2| 1| 1|
| 2| 5| 2|
+----+----+-----+
如果您不希望等行后有空隙,您应该使用 dense_rank 函数。
我的数据框如下:
cola, colb
1, 2
1, 3
2, 1
2, 5
我想添加一列count
:
cola, colb, count
1, 2, 1
1, 3, 2
2, 1, 1
2, 5, 2
需要对 colA 进行分组并将第一个记录的计数设置为 1,然后将每个连续的行递增 1。
我尝试使用 window 函数,但它对 colA 中的所有记录应用相同的计数,但没有递增。
您已经发现 window 函数是正确的选择。可能你没用过rank功能
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [(1 , 2),
(1 , 3 ),
(1 , 2 ),
(2 , 1 ),
(2 , 5)]
columns = ['cola', 'colb']
df=spark.createDataFrame(l, columns)
w = Window.partitionBy('cola').orderBy('colb')
df = df.withColumn('count', F.rank().over(w))
df.show()
输出:
+----+----+-----+
|cola|colb|count|
+----+----+-----+
| 1| 2| 1|
| 1| 2| 1|
| 1| 3| 3|
| 2| 1| 1|
| 2| 5| 2|
+----+----+-----+
如果您不希望等行后有空隙,您应该使用 dense_rank 函数。