添加字典键作为列名和字典值作为 Pyspark df 中该列的常量值
Adding dictionary keys as column name and dictionary value as the constant value of that column in Pyspark df
我有一本字典 x = {'colA': 20, 'colB': 30}
和一个 pyspark df。
ID Value
1 ABC
1 BCD
1 AKB
2 CAB
2 AIK
3 KIB
我想使用 x 创建 df1,如下所示:
ID Value colA colB
1 ABC 20.0 30.0
1 BCD 20.0 30.0
1 AKB 20.0 30.0
2 CAB 20.0 30.0
...
知道如何使用 Pyspark。
我知道我可以像这样创建一个常量列,
df1 = df.withColumn('colA', lit(20.0))
df1 = df1.withColumn('colB', lit(30.0))
但不确定从字典中执行此操作的动态过程
如下遍历字典
df1 = df
for key in x:
df1 = df1.withColumn(key, lit(x[key]))
有一些方法可以隐藏循环,但执行是一样的。例如,您可以使用 select
:
from pyspark.sql.functions import lit
df2 = df.select("*", *[lit(val).alias(key) for key, val in x.items()])
df2.show()
#+---+-----+----+----+
#| ID|Value|colB|colA|
#+---+-----+----+----+
#| 1| ABC| 30| 20|
#| 1| BCD| 30| 20|
#| 1| AKB| 30| 20|
#| 2| CAB| 30| 20|
#| 2| AIK| 30| 20|
#| 3| KIB| 30| 20|
#+---+-----+----+----+
或functools.reduce
和withColumn
:
from functools import reduce
df3 = reduce(lambda df, key: df.withColumn(key, lit(x[key])), x, df)
df3.show()
# Same as above
或pyspark.sql.functions.struct
with :
from pyspark.sql.functions import struct
df4 = df.withColumn('x', struct([lit(val).alias(key) for key, val in x.items()]))\
.select("ID", "Value", "x.*")
df4.show()
#Same as above
但是如果你查看这些方法的执行计划,你会发现它们是完全一样的:
df2.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#151, 20 AS colA#152]
#+- Scan ExistingRDD[ID#44L,Value#45]
df3.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#102, 20 AS colA#107]
#+- Scan ExistingRDD[ID#44L,Value#45]
df4.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#120, 20 AS colA#121]
#+- Scan ExistingRDD[ID#44L,Value#45]
进一步,如果你比较@anil 中的循环方法:
df1 = df
for key in x:
df1 = df1.withColumn(key, lit(x[key]))
df1.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#127, 20 AS colA#132]
#+- Scan ExistingRDD[ID#44L,Value#45]
你会发现这也是一样的。
我有一本字典 x = {'colA': 20, 'colB': 30}
和一个 pyspark df。
ID Value
1 ABC
1 BCD
1 AKB
2 CAB
2 AIK
3 KIB
我想使用 x 创建 df1,如下所示:
ID Value colA colB
1 ABC 20.0 30.0
1 BCD 20.0 30.0
1 AKB 20.0 30.0
2 CAB 20.0 30.0
...
知道如何使用 Pyspark。 我知道我可以像这样创建一个常量列,
df1 = df.withColumn('colA', lit(20.0))
df1 = df1.withColumn('colB', lit(30.0))
但不确定从字典中执行此操作的动态过程
如下遍历字典
df1 = df
for key in x:
df1 = df1.withColumn(key, lit(x[key]))
有一些方法可以隐藏循环,但执行是一样的。例如,您可以使用 select
:
from pyspark.sql.functions import lit
df2 = df.select("*", *[lit(val).alias(key) for key, val in x.items()])
df2.show()
#+---+-----+----+----+
#| ID|Value|colB|colA|
#+---+-----+----+----+
#| 1| ABC| 30| 20|
#| 1| BCD| 30| 20|
#| 1| AKB| 30| 20|
#| 2| CAB| 30| 20|
#| 2| AIK| 30| 20|
#| 3| KIB| 30| 20|
#+---+-----+----+----+
或functools.reduce
和withColumn
:
from functools import reduce
df3 = reduce(lambda df, key: df.withColumn(key, lit(x[key])), x, df)
df3.show()
# Same as above
或pyspark.sql.functions.struct
with
from pyspark.sql.functions import struct
df4 = df.withColumn('x', struct([lit(val).alias(key) for key, val in x.items()]))\
.select("ID", "Value", "x.*")
df4.show()
#Same as above
但是如果你查看这些方法的执行计划,你会发现它们是完全一样的:
df2.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#151, 20 AS colA#152]
#+- Scan ExistingRDD[ID#44L,Value#45]
df3.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#102, 20 AS colA#107]
#+- Scan ExistingRDD[ID#44L,Value#45]
df4.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#120, 20 AS colA#121]
#+- Scan ExistingRDD[ID#44L,Value#45]
进一步,如果你比较@anil
df1 = df
for key in x:
df1 = df1.withColumn(key, lit(x[key]))
df1.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#127, 20 AS colA#132]
#+- Scan ExistingRDD[ID#44L,Value#45]
你会发现这也是一样的。