withColumn 中的用户定义函数只调用一次而不是每个 DF 行
User Defined Function in withColumn called just once rather than per DF row
我有一个用户定义的函数有问题,该函数是为连接一个数据帧的值而构建的,该数据帧与另一个数据帧的索引值相匹配。
这是我尝试匹配的简化数据帧:
a_df:
+-------+------+
| index | name |
+-------+------+
| 1 | aaa |
| 2 | bbb |
| 3 | ccc |
| 4 | ddd |
| 5 | eee |
+-------+------+
b_df:
+-------+------+
| index | code |
+-------+------+
| 1 | 101 |
| 2 | 102 |
| 3 | 101 |
| 3 | 102 |
| 4 | 103 |
| 4 | 104 |
| 5 | 101 |
+-------+------+
udf 函数和调用:
> def concatcodes(index, dataframe):
> res = dataframe.where(dataframe.index == index).collect()
> reslist = "|".join([value.code for value in res])
> return reslist
>
> spark.udf.register("concatcodes", concatcodes, StringType())
>
> resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
我希望在 a_DF 数据帧的每一行调用该函数,从而产生以下输出:
+-------+------+-------+
| index | name |codes |
+-------+------+-------+
| 1 | aaa |101 |
| 2 | bbb |102 |
| 3 | ccc |101|102|
| 4 | ddd |103|104|
| 5 | eee |101 |
+-------+------+-------+
但是,该函数似乎只被调用一次,整个列作为其参数传递,导致以下输出:
+-------+------+---------------------------+
| index | name |codes |
+-------+------+---------------------------+
| 1 | aaa |101|102|101|102|103|104|101| |
| 2 | bbb |101|102|101|102|103|104|101|
| 3 | ccc |101|102|101|102|103|104|101|
| 4 | ddd |101|102|101|102|103|104|101|
| 5 | eee |101|102|101|102|103|104|101|
+-------+------+---------------------------+
我想我在 .withColum 方法中调用 UDF 时犯了根本性的错误,但我不知道是什么 - 我非常感谢有人指出我的逻辑有什么问题。
首先,你 for this. The heart of your question is essentially 和一个join
。以下将产生所需的输出:
from pyspark.sql.functions import collect_list, concat_ws
resultDF = a_df.join(
b_df.groupBy("index").agg(concat_ws("|", collect_list("code")).alias("code")),
on="index"
)
resultDF .show()
#+-----+----+-------+
#|index|name| code|
#+-----+----+-------+
#| 3| ccc|101|102|
#| 5| eee| 101|
#| 1| aaa| 101|
#| 4| ddd|103|104|
#| 2| bbb| 102|
#+-----+----+-------+
请记住,spark 数据帧本质上是无序的,除非您使用 sort
或 orderBy
.
显式引入顺序
要解决您尝试的问题:
I suppose I am doing something fundamentally wrong when it comes to calling UDF in the .withColum method but I could not figure out what
如果您查看代码的执行计划,您会发现 where(dataframe.index == index)
部分基本上被忽略了。
resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
resultDF.explain()
#== Physical Plan ==
#*(1) Project [index#0, name#1, 101|102|101|102|103|104|101 AS codes#64]
#+- Scan ExistingRDD[index#0,name#1]
我怀疑这是由于 . You can't ,所以必须发生的是优化器 运行 collect
一次并将其用于所有行。
这里更大的问题是在 udf
中调用 collect
的方法违背了 spark 的目的(这是你的根本误解)。使用 spark 的全部意义在于将您的计算并行分布到多个执行程序。当您使用 collect
操作时,这会将所有数据带入驱动程序的本地内存中。 (在你的情况下,它似乎会被广播回执行者)。
相反,当您需要引用来自多个 spark DataFrame 的数据时,请使用 join
s。对于 udf
s,您可以将它们视为本质上仅用于对单个 spark DataFrame 的单个 Row
进行操作。
这是我的方法
df = pd.merge(a_df,b_df, on = "index")
df.groupby("index").agg({"name" : 'first', "code" : list})
结果是
index name code
1 aaa [101]
2 bbb [102]
3 ccc [101, 102]
4 ddd [103, 104]
5 eee [101]
我有一个用户定义的函数有问题,该函数是为连接一个数据帧的值而构建的,该数据帧与另一个数据帧的索引值相匹配。
这是我尝试匹配的简化数据帧:
a_df:
+-------+------+
| index | name |
+-------+------+
| 1 | aaa |
| 2 | bbb |
| 3 | ccc |
| 4 | ddd |
| 5 | eee |
+-------+------+
b_df:
+-------+------+
| index | code |
+-------+------+
| 1 | 101 |
| 2 | 102 |
| 3 | 101 |
| 3 | 102 |
| 4 | 103 |
| 4 | 104 |
| 5 | 101 |
+-------+------+
udf 函数和调用:
> def concatcodes(index, dataframe):
> res = dataframe.where(dataframe.index == index).collect()
> reslist = "|".join([value.code for value in res])
> return reslist
>
> spark.udf.register("concatcodes", concatcodes, StringType())
>
> resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
我希望在 a_DF 数据帧的每一行调用该函数,从而产生以下输出:
+-------+------+-------+
| index | name |codes |
+-------+------+-------+
| 1 | aaa |101 |
| 2 | bbb |102 |
| 3 | ccc |101|102|
| 4 | ddd |103|104|
| 5 | eee |101 |
+-------+------+-------+
但是,该函数似乎只被调用一次,整个列作为其参数传递,导致以下输出:
+-------+------+---------------------------+
| index | name |codes |
+-------+------+---------------------------+
| 1 | aaa |101|102|101|102|103|104|101| |
| 2 | bbb |101|102|101|102|103|104|101|
| 3 | ccc |101|102|101|102|103|104|101|
| 4 | ddd |101|102|101|102|103|104|101|
| 5 | eee |101|102|101|102|103|104|101|
+-------+------+---------------------------+
我想我在 .withColum 方法中调用 UDF 时犯了根本性的错误,但我不知道是什么 - 我非常感谢有人指出我的逻辑有什么问题。
首先,你join
。以下将产生所需的输出:
from pyspark.sql.functions import collect_list, concat_ws
resultDF = a_df.join(
b_df.groupBy("index").agg(concat_ws("|", collect_list("code")).alias("code")),
on="index"
)
resultDF .show()
#+-----+----+-------+
#|index|name| code|
#+-----+----+-------+
#| 3| ccc|101|102|
#| 5| eee| 101|
#| 1| aaa| 101|
#| 4| ddd|103|104|
#| 2| bbb| 102|
#+-----+----+-------+
请记住,spark 数据帧本质上是无序的,除非您使用 sort
或 orderBy
.
要解决您尝试的问题:
I suppose I am doing something fundamentally wrong when it comes to calling UDF in the .withColum method but I could not figure out what
如果您查看代码的执行计划,您会发现 where(dataframe.index == index)
部分基本上被忽略了。
resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
resultDF.explain()
#== Physical Plan ==
#*(1) Project [index#0, name#1, 101|102|101|102|103|104|101 AS codes#64]
#+- Scan ExistingRDD[index#0,name#1]
我怀疑这是由于 collect
一次并将其用于所有行。
这里更大的问题是在 udf
中调用 collect
的方法违背了 spark 的目的(这是你的根本误解)。使用 spark 的全部意义在于将您的计算并行分布到多个执行程序。当您使用 collect
操作时,这会将所有数据带入驱动程序的本地内存中。 (在你的情况下,它似乎会被广播回执行者)。
相反,当您需要引用来自多个 spark DataFrame 的数据时,请使用 join
s。对于 udf
s,您可以将它们视为本质上仅用于对单个 spark DataFrame 的单个 Row
进行操作。
这是我的方法
df = pd.merge(a_df,b_df, on = "index")
df.groupby("index").agg({"name" : 'first', "code" : list})
结果是
index name code
1 aaa [101]
2 bbb [102]
3 ccc [101, 102]
4 ddd [103, 104]
5 eee [101]