使用 Window 操作替换所有列值?

Replacing all column values using Window operation?

您好,数据框创建如下。

df = sc.parallelize([
    (1, 3),
    (2, 3),
    (3, 2),
    (4,2),
    (1, 3)
]).toDF(["id",'t']) 

显示如下。

+---+---+
| id|  t|
+---+---+
|  1|  3|
|  2|  3|
|  3|  2|
|  4|  2|
|  1|  3|
+---+---+

我的主要目的是,我想用重复次数替换每列中的重复值。

所以我尝试了流畅的代码,但它没有按预期工作。

from pyspark.sql.functions import col
column_list = ["id",'t']
w = Window.partitionBy(column_list)
dfmax=df.select(*((count(col(c)).over(w)).alias(c) for c in df.columns))
dfmax.show()
+---+---+
| id|  t|
+---+---+
|  2|  2|
|  2|  2|
|  1|  1|
|  1|  1|
|  1|  1|
+---+---+

我的预期输出是

+---+---+
| id|  t|
+---+---+
|  2|  3|
|  1|  3|
|  1|  1|
|  1|  1|
|  2|  3|
+---+---+

如果我没理解错的话,你要找的只是:

df.select(*[count(c).over(Window.partitionBy(c)).alias(c) for c in df.columns]).show()
#+---+---+
#| id|  t|
#+---+---+
#|  2|  3|
#|  2|  3|
#|  1|  2|
#|  1|  3|
#|  1|  2|
#+---+---+

这与您发布的内容之间的区别在于我们一次只按一列进行分区。

请记住,DataFrame 是无序的。如果你想维护你的行顺序,你可以使用 pyspark.sql.functions.monotonically_increasing_id():

添加一个排序列
from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("order", monotonically_increasing_id())\
    .select(*[count(c).over(Window.partitionBy(c)).alias(c) for c in df.columns])\
    .sort("order")\
    .drop("order")\
    .show()
#+---+---+
#| id|  t|
#+---+---+
#|  2|  3|
#|  1|  3|
#|  1|  2|
#|  1|  2|
#|  2|  3|
#+---+---+