将转换应用于多列 pyspark 数据框
Apply a transformation to multiple columns pyspark dataframe
假设我有以下 spark-dataframe:
+-----+-------+
| word| label|
+-----+-------+
| red| color|
| red| color|
| blue| color|
| blue|feeling|
|happy|feeling|
+-----+-------+
可以使用以下代码创建:
sample_df = spark.createDataFrame([
('red', 'color'),
('red', 'color'),
('blue', 'color'),
('blue', 'feeling'),
('happy', 'feeling')
],
('word', 'label')
)
我可以执行 groupBy()
来获取每个词标签对的计数:
sample_df = sample_df.groupBy('word', 'label').count()
#+-----+-------+-----+
#| word| label|count|
#+-----+-------+-----+
#| blue| color| 1|
#| blue|feeling| 1|
#| red| color| 2|
#|happy|feeling| 1|
#+-----+-------+-----+
然后 pivot()
和 sum()
将标签计数为列:
import pyspark.sql.functions as f
sample_df = sample_df.groupBy('word').pivot('label').agg(f.sum('count')).na.fill(0)
#+-----+-----+-------+
#| word|color|feeling|
#+-----+-----+-------+
#| red| 2| 0|
#|happy| 0| 1|
#| blue| 1| 1|
#+-----+-----+-------+
转换此 dataframe
以便每一行除以该行的总数的最佳方法是什么?
# Desired output
+-----+-----+-------+
| word|color|feeling|
+-----+-----+-------+
| red| 1.0| 0.0|
|happy| 0.0| 1.0|
| blue| 0.5| 0.5|
+-----+-----+-------+
实现此结果的一种方法是使用 __builtin__.sum
(不是 pyspark.sql.functions.sum
)获取行式总和,然后为每个标签调用 withColumn()
:
labels = ['color', 'feeling']
sample_df.withColumn('total', sum([f.col(x) for x in labels]))\
.withColumn('color', f.col('color')/f.col('total'))\
.withColumn('feeling', f.col('feeling')/f.col('total'))\
.select('word', 'color', 'feeling')\
.show()
但是必须有比枚举每个可能的列更好的方法。
更笼统地说,我的问题是:
如何将当前行的函数同时应用于多个列?
在 this Medium post 上找到了答案。
首先为总计创建一列(如上所述),然后使用 *
运算符对 select()
:
中的标签解包列表理解
labels = ['color', 'feeling']
sample_df = sample_df.withColumn('total', sum([f.col(x) for x in labels]))
sample_df.select(
'word', *[(f.col(col_name)/f.col('total')).alias(col_name) for col_name in labels]
).show()
链接 post 上显示的方法展示了如何将其推广到任意转换。
假设我有以下 spark-dataframe:
+-----+-------+
| word| label|
+-----+-------+
| red| color|
| red| color|
| blue| color|
| blue|feeling|
|happy|feeling|
+-----+-------+
可以使用以下代码创建:
sample_df = spark.createDataFrame([
('red', 'color'),
('red', 'color'),
('blue', 'color'),
('blue', 'feeling'),
('happy', 'feeling')
],
('word', 'label')
)
我可以执行 groupBy()
来获取每个词标签对的计数:
sample_df = sample_df.groupBy('word', 'label').count()
#+-----+-------+-----+
#| word| label|count|
#+-----+-------+-----+
#| blue| color| 1|
#| blue|feeling| 1|
#| red| color| 2|
#|happy|feeling| 1|
#+-----+-------+-----+
然后 pivot()
和 sum()
将标签计数为列:
import pyspark.sql.functions as f
sample_df = sample_df.groupBy('word').pivot('label').agg(f.sum('count')).na.fill(0)
#+-----+-----+-------+
#| word|color|feeling|
#+-----+-----+-------+
#| red| 2| 0|
#|happy| 0| 1|
#| blue| 1| 1|
#+-----+-----+-------+
转换此 dataframe
以便每一行除以该行的总数的最佳方法是什么?
# Desired output
+-----+-----+-------+
| word|color|feeling|
+-----+-----+-------+
| red| 1.0| 0.0|
|happy| 0.0| 1.0|
| blue| 0.5| 0.5|
+-----+-----+-------+
实现此结果的一种方法是使用 __builtin__.sum
(不是 pyspark.sql.functions.sum
)获取行式总和,然后为每个标签调用 withColumn()
:
labels = ['color', 'feeling']
sample_df.withColumn('total', sum([f.col(x) for x in labels]))\
.withColumn('color', f.col('color')/f.col('total'))\
.withColumn('feeling', f.col('feeling')/f.col('total'))\
.select('word', 'color', 'feeling')\
.show()
但是必须有比枚举每个可能的列更好的方法。
更笼统地说,我的问题是:
如何将当前行的函数同时应用于多个列?
在 this Medium post 上找到了答案。
首先为总计创建一列(如上所述),然后使用 *
运算符对 select()
:
labels = ['color', 'feeling']
sample_df = sample_df.withColumn('total', sum([f.col(x) for x in labels]))
sample_df.select(
'word', *[(f.col(col_name)/f.col('total')).alias(col_name) for col_name in labels]
).show()
链接 post 上显示的方法展示了如何将其推广到任意转换。