在 pyspark 中对列表中的不同数据框列求和的正确方法是什么?

Whats is the correct way to sum different dataframe columns in a list in pyspark?

我想对 spark 数据框中的不同列求和。

代码

from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)

# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))

为什么不是方法 2。 & #3。不工作? 我在使用 Spark 2.2

因为,

# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

这里你使用的是 python 内置的求和函数,它将可迭代作为输入,所以它可以工作。 https://docs.python.org/2/library/functions.html#sum

#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

此处您使用的是 pyspark sum 函数,该函数将列作为输入,但您正试图在行级别获取它。 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sum

#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))

在这里,df.select() returns 一个数据帧并尝试对一个数据帧求和。在这种情况下,我认为,您必须逐行迭代并对它应用求和。

TL;DR builtins.sum 就好了。


关注你的

Using native python sum() is not benefitting from spark optimization. so whats the spark way of doing it

its not a pypark function so it wont be really be completely benefiting from spark right.

我可以看出你做出了错误的假设。

我们来分解问题:

[df[col] for col in ["`A.p1`","`B.p1`"]]

创建 Columns 的列表:

[Column<b'A.p1'>, Column<b'B.p1'>]

我们称它为iterable

sum 通过获取此列表的元素并调用 __add__ 方法 (+) 来减少输出。命令等价物是:

accum = iterable[0]
for element in iterable[1:]:
    accum = accum + element

这给出 Column:

Column<b'(A.p1 + B.p1)'>

这与调用

相同
df["`A.p1`"] + df["`B.p1`"]

未触及任何数据,评估时它受益于所有 Spark 优化。

将列表中的多列添加到一列中

我尝试了很多方法,以下是我的观察:

  1. PySpark 的 sum 函数不支持列添加(Pyspark 版本 2.3.1)
  2. 内置 python 的 sum 函数对某些人有效,但对其他人却出错(可能是因为名称冲突)

在您的第三种方法中,表达式(在 python 的 sum 函数内)返回 PySpark DataFrame。

因此,可以使用PySpark中的expr函数实现多列的添加,该函数将要计算的表达式作为输入。

from pyspark.sql.functions import expr

cols_list = ['a', 'b', 'c']

# Creating an addition expression using `join`
expression = '+'.join(cols_list)

df = df.withColumn('sum_cols', expr(expression))

这为我们提供了所需的列总和。我们还可以使用任何其他复杂表达式来获得其他输出。