嗨,你能帮我解决在 Pyspark 中创建新专栏时的问题吗:我对问题的解释如下:
HI,Could you please help me resolving Issue while creating new column in Pyspark: I explained the issue as below:
我正在使用的查询:
我想根据条件用新值替换现有列,如果另一个列的值 = ABC,则该列保持不变,否则应为 null 或空白。
它按照逻辑给出结果,但仅针对它在循环中遇到的最后一列。
import pyspark.sql.functions as F
for i in df.columns:
if i[4:]!='ff':
new_df=df.withColumn(i,F.when(df.col_ff=="abc",df[i])\
.otherwise(None))
df:
+------+----+-----+-------+
| col1 |col2|col3 | col_ff|
+------+----+-----+-------+
| a | a | d | abc |
| a | b | c | def |
| b | c | b | abc |
| c | d | a | def |
+------+----+-----+-------+
要求输出:
+------+----+-----+-------+
| col1 |col2|col3 | col_ff|
+------+----+-----+-------+
| a | a | d | abc |
| null |null|null | def |
| b | c | b | abc |
| null |null|null | def |
+------+----+-----+-------+
您的代码中的问题是您在循环的每次迭代中都用原始 DataFrame df
覆盖了 new_df
。您可以通过先在循环外设置 new_df = df
,然后在循环内对 new_df
执行 withColumn
操作来修复它。
例如,如果 df
如下:
df.show()
#+----+----+----+------+
#|col1|col2|col3|col_ff|
#+----+----+----+------+
#| a| a| d| abc|
#| a| b| c| def|
#| b| c| b| abc|
#| c| d| a| def|
#+----+----+----+------+
将您的代码更改为:
import pyspark.sql.functions as F
new_df = df
for i in df.columns:
if i[4:]!='ff':
new_df = new_df.withColumn(i, F.when(F.col("col_ff")=="abc", F.col(i)))
注意这里我去掉了.otherwise(None)
部分,因为如果不满足条件,when
默认会returnnull
。
您也可以使用 functools.reduce
:
来做同样的事情
from functools import reduce # for python3
new_df = reduce(
lambda df, i: df.withColumn(i, F.when(F.col("col_ff")=="abc", F.col(i))),
[i for i in df.columns if i[4:] != "ff"],
df
)
两种情况下的结果是一样的:
new_df.show()
#+----+----+----+------+
#|col1|col2|col3|col_ff|
#+----+----+----+------+
#| a| a| d| abc|
#|null|null|null| def|
#| b| c| b| abc|
#|null|null|null| def|
#+----+----+----+------+
我正在使用的查询:
我想根据条件用新值替换现有列,如果另一个列的值 = ABC,则该列保持不变,否则应为 null 或空白。 它按照逻辑给出结果,但仅针对它在循环中遇到的最后一列。
import pyspark.sql.functions as F
for i in df.columns:
if i[4:]!='ff':
new_df=df.withColumn(i,F.when(df.col_ff=="abc",df[i])\
.otherwise(None))
df:
+------+----+-----+-------+
| col1 |col2|col3 | col_ff|
+------+----+-----+-------+
| a | a | d | abc |
| a | b | c | def |
| b | c | b | abc |
| c | d | a | def |
+------+----+-----+-------+
要求输出:
+------+----+-----+-------+
| col1 |col2|col3 | col_ff|
+------+----+-----+-------+
| a | a | d | abc |
| null |null|null | def |
| b | c | b | abc |
| null |null|null | def |
+------+----+-----+-------+
您的代码中的问题是您在循环的每次迭代中都用原始 DataFrame df
覆盖了 new_df
。您可以通过先在循环外设置 new_df = df
,然后在循环内对 new_df
执行 withColumn
操作来修复它。
例如,如果 df
如下:
df.show()
#+----+----+----+------+
#|col1|col2|col3|col_ff|
#+----+----+----+------+
#| a| a| d| abc|
#| a| b| c| def|
#| b| c| b| abc|
#| c| d| a| def|
#+----+----+----+------+
将您的代码更改为:
import pyspark.sql.functions as F
new_df = df
for i in df.columns:
if i[4:]!='ff':
new_df = new_df.withColumn(i, F.when(F.col("col_ff")=="abc", F.col(i)))
注意这里我去掉了.otherwise(None)
部分,因为如果不满足条件,when
默认会returnnull
。
您也可以使用 functools.reduce
:
from functools import reduce # for python3
new_df = reduce(
lambda df, i: df.withColumn(i, F.when(F.col("col_ff")=="abc", F.col(i))),
[i for i in df.columns if i[4:] != "ff"],
df
)
两种情况下的结果是一样的:
new_df.show()
#+----+----+----+------+
#|col1|col2|col3|col_ff|
#+----+----+----+------+
#| a| a| d| abc|
#|null|null|null| def|
#| b| c| b| abc|
#|null|null|null| def|
#+----+----+----+------+