嗨,你能帮我解决在 Pyspark 中创建新专栏时的问题吗:我对问题的解释如下:

HI,Could you please help me resolving Issue while creating new column in Pyspark: I explained the issue as below:

我正在使用的查询:

我想根据条件用新值替换现有列,如果另一个列的值 = ABC,则该列保持不变,否则应为 null 或空白。 它按照逻辑给出结果,但仅针对它在循环中遇到的最后一列。

import pyspark.sql.functions as F

for i in df.columns:
    if i[4:]!='ff':        
        new_df=df.withColumn(i,F.when(df.col_ff=="abc",df[i])\
       .otherwise(None))

df:
+------+----+-----+-------+
| col1 |col2|col3 | col_ff|
+------+----+-----+-------+
|   a  | a  | d   | abc   |
|   a  | b  | c   | def   |
|   b  | c  | b   | abc   |
|   c  | d  | a   | def   |
+------+----+-----+-------+

要求输出:

+------+----+-----+-------+
| col1 |col2|col3 | col_ff|
+------+----+-----+-------+
|   a  | a  | d   | abc   |
| null |null|null | def   |
|   b  | c  | b   | abc   |
| null |null|null | def   |
+------+----+-----+-------+

您的代码中的问题是您在循环的每次迭代中都用原始 DataFrame df 覆盖了 new_df。您可以通过先在循环外设置 new_df = df,然后在循环内对 new_df 执行 withColumn 操作来修复它。

例如,如果 df 如下:

df.show()
#+----+----+----+------+
#|col1|col2|col3|col_ff|
#+----+----+----+------+
#|   a|   a|   d|   abc|
#|   a|   b|   c|   def|
#|   b|   c|   b|   abc|
#|   c|   d|   a|   def|
#+----+----+----+------+

将您的代码更改为:

import pyspark.sql.functions as F

new_df = df
for i in df.columns:
    if i[4:]!='ff':        
        new_df = new_df.withColumn(i, F.when(F.col("col_ff")=="abc", F.col(i)))

注意这里我去掉了.otherwise(None)部分,因为如果不满足条件,when默认会returnnull

您也可以使用 functools.reduce:

来做同样的事情
from functools import reduce  # for python3
new_df = reduce(
    lambda df, i: df.withColumn(i, F.when(F.col("col_ff")=="abc", F.col(i))),
    [i for i in df.columns if i[4:] != "ff"], 
    df
)

两种情况下的结果是一样的:

new_df.show()
#+----+----+----+------+
#|col1|col2|col3|col_ff|
#+----+----+----+------+
#|   a|   a|   d|   abc|
#|null|null|null|   def|
#|   b|   c|   b|   abc|
#|null|null|null|   def|
#+----+----+----+------+