Pyspark Dataframe 将函数应用于两列
Pyspark Dataframe Apply function to two columns
假设我有两个 PySpark DataFrame df1
和 df2
。
df1= 'a'
1
2
5
df2= 'b'
3
6
我想为每个 df1['a']
找到最接近的 df2['b']
值,并将最接近的值添加为 df1
中的新列。
换句话说,对于 df1['a']
中的每个值 x
,我想找到一个 y
实现所有 y in df2['b']
的 min(abx(x-y))
(注意: 可以假设只有一个y
可以达到最小距离),结果就是
'a' 'b'
1 3
2 3
5 6
我尝试使用以下代码首先创建一个距离矩阵(在找到达到最小距离的值之前):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
这给出了
Column<PythonUDF#dist(a,b)>
然后我试了
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
它永远运行而不给 error/output。
我的问题是:
- 由于我是 Spark 的新手,我构建输出 DataFrame 的方法是否有效? (我的方法是首先为所有
a
和 b
值创建一个距离矩阵,然后找到 min
一个)
- 我的代码的最后一行有什么问题以及如何修复它?
从你的第二个问题开始 - 你只能将 udf 应用于现有数据框,我想你正在考虑这样的事情:
>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
| a| b|distance|
+---+---+--------+
| 1| 3| 2|
| 1| 6| 5|
| 2| 3| 1|
| 2| 6| 4|
| 5| 3| 2|
| 5| 6| 1|
+---+---+--------+
但是有一种更有效的方法来应用这个距离,通过使用内部 abs
:
>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
然后你可以通过计算找到匹配的数字:
>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+
| a| b|
+---+---+
| 5| 6|
| 1| 3|
| 2| 3|
+---+---+
假设我有两个 PySpark DataFrame df1
和 df2
。
df1= 'a'
1
2
5
df2= 'b'
3
6
我想为每个 df1['a']
找到最接近的 df2['b']
值,并将最接近的值添加为 df1
中的新列。
换句话说,对于 df1['a']
中的每个值 x
,我想找到一个 y
实现所有 y in df2['b']
的 min(abx(x-y))
(注意: 可以假设只有一个y
可以达到最小距离),结果就是
'a' 'b'
1 3
2 3
5 6
我尝试使用以下代码首先创建一个距离矩阵(在找到达到最小距离的值之前):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
这给出了
Column<PythonUDF#dist(a,b)>
然后我试了
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
它永远运行而不给 error/output。
我的问题是:
- 由于我是 Spark 的新手,我构建输出 DataFrame 的方法是否有效? (我的方法是首先为所有
a
和b
值创建一个距离矩阵,然后找到min
一个) - 我的代码的最后一行有什么问题以及如何修复它?
从你的第二个问题开始 - 你只能将 udf 应用于现有数据框,我想你正在考虑这样的事情:
>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
| a| b|distance|
+---+---+--------+
| 1| 3| 2|
| 1| 6| 5|
| 2| 3| 1|
| 2| 6| 4|
| 5| 3| 2|
| 5| 6| 1|
+---+---+--------+
但是有一种更有效的方法来应用这个距离,通过使用内部 abs
:
>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
然后你可以通过计算找到匹配的数字:
>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+
| a| b|
+---+---+
| 5| 6|
| 1| 3|
| 2| 3|
+---+---+