在 pyspark 中对列表中的不同数据框列求和的正确方法是什么?
Whats is the correct way to sum different dataframe columns in a list in pyspark?
我想对 spark 数据框中的不同列求和。
代码
from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
为什么不是方法 2。 & #3。不工作?
我在使用 Spark 2.2
因为,
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
这里你使用的是 python 内置的求和函数,它将可迭代作为输入,所以它可以工作。 https://docs.python.org/2/library/functions.html#sum
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
此处您使用的是 pyspark sum 函数,该函数将列作为输入,但您正试图在行级别获取它。
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sum
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
在这里,df.select() returns 一个数据帧并尝试对一个数据帧求和。在这种情况下,我认为,您必须逐行迭代并对它应用求和。
TL;DR builtins.sum
就好了。
关注你的:
Using native python sum() is not benefitting from spark optimization. so whats the spark way of doing it
its not a pypark function so it wont be really be completely benefiting from spark right.
我可以看出你做出了错误的假设。
我们来分解问题:
[df[col] for col in ["`A.p1`","`B.p1`"]]
创建 Columns
的列表:
[Column<b'A.p1'>, Column<b'B.p1'>]
我们称它为iterable
。
sum
通过获取此列表的元素并调用 __add__
方法 (+
) 来减少输出。命令等价物是:
accum = iterable[0]
for element in iterable[1:]:
accum = accum + element
这给出 Column
:
Column<b'(A.p1 + B.p1)'>
这与调用
相同
df["`A.p1`"] + df["`B.p1`"]
未触及任何数据,评估时它受益于所有 Spark 优化。
将列表中的多列添加到一列中
我尝试了很多方法,以下是我的观察:
- PySpark 的
sum
函数不支持列添加(Pyspark 版本 2.3.1)
- 内置 python 的
sum
函数对某些人有效,但对其他人却出错(可能是因为名称冲突)
在您的第三种方法中,表达式(在 python 的 sum
函数内)返回 PySpark DataFrame。
因此,可以使用PySpark中的expr
函数实现多列的添加,该函数将要计算的表达式作为输入。
from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))
这为我们提供了所需的列总和。我们还可以使用任何其他复杂表达式来获得其他输出。
我想对 spark 数据框中的不同列求和。
代码
from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
为什么不是方法 2。 & #3。不工作? 我在使用 Spark 2.2
因为,
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
这里你使用的是 python 内置的求和函数,它将可迭代作为输入,所以它可以工作。 https://docs.python.org/2/library/functions.html#sum
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
此处您使用的是 pyspark sum 函数,该函数将列作为输入,但您正试图在行级别获取它。 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sum
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
在这里,df.select() returns 一个数据帧并尝试对一个数据帧求和。在这种情况下,我认为,您必须逐行迭代并对它应用求和。
TL;DR builtins.sum
就好了。
关注你的
Using native python sum() is not benefitting from spark optimization. so whats the spark way of doing it
its not a pypark function so it wont be really be completely benefiting from spark right.
我可以看出你做出了错误的假设。
我们来分解问题:
[df[col] for col in ["`A.p1`","`B.p1`"]]
创建 Columns
的列表:
[Column<b'A.p1'>, Column<b'B.p1'>]
我们称它为iterable
。
sum
通过获取此列表的元素并调用 __add__
方法 (+
) 来减少输出。命令等价物是:
accum = iterable[0]
for element in iterable[1:]:
accum = accum + element
这给出 Column
:
Column<b'(A.p1 + B.p1)'>
这与调用
相同df["`A.p1`"] + df["`B.p1`"]
未触及任何数据,评估时它受益于所有 Spark 优化。
将列表中的多列添加到一列中
我尝试了很多方法,以下是我的观察:
- PySpark 的
sum
函数不支持列添加(Pyspark 版本 2.3.1) - 内置 python 的
sum
函数对某些人有效,但对其他人却出错(可能是因为名称冲突)
在您的第三种方法中,表达式(在 python 的 sum
函数内)返回 PySpark DataFrame。
因此,可以使用PySpark中的expr
函数实现多列的添加,该函数将要计算的表达式作为输入。
from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))
这为我们提供了所需的列总和。我们还可以使用任何其他复杂表达式来获得其他输出。