如何将第二个数据帧的列传递到 PySpark 1.6.1 中的 UDF
How to pass the column of a second dataframe into a UDF in PySpark 1.6.1
这就是我想要做的。我想对两个不同数据框中两列的每个条目进行比较。数据框如下所示:
>>> subject_df.show()
+------+-------------+
|USERID| FULLNAME|
+------+-------------+
| 12345| steve james|
| 12346| steven smith|
| 43212|bill dunnigan|
+------+-------------+
>>> target_df.show()
+------+-------------+
|USERID| FULLNAME|
+------+-------------+
|111123| steve tyler|
|422226| linda smith|
|123333|bill dunnigan|
| 56453| steve smith|
+------+-------------+
这是我尝试使用的逻辑:
# CREATE FUNCTION
def string_match(subject, targets):
for target in targets:
<logic>
return logic_result
# CREATE UDF
string_match_udf = udf(string_match, IntegerType())
# APPLY UDF
subject_df.select(subject_df.FULLNAME, string_match_udf(subject_df.FULLNAME, target_df.FULLNAME).alias("score"))
这是我在 运行 pyspark shell 中的代码时得到的错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o45.select.
: java.lang.RuntimeException: Invalid PythonUDF PythonUDF#string_match(FULLNAME#2,FULLNAME#5), requires attributes from more than one child.
我认为问题的根源在于试图将第二列传递给函数。我应该使用 RDD 吗?请记住,实际的 subject_df 和 target_df 都超过 100,000 行。我愿意接受任何建议。
您似乎误解了用户定义函数的工作原理:
- 函数当时只接收一行的值
- 您不能使用来自不相关
DataFame
的数据。
做你想做的唯一方法是取笛卡尔积。
subject_df.join(target_df).select(
f(subject_df.FULLNAME, target_df.FULLNAME)
)
其中f
是一个比较当时两个元素的函数
这就是我想要做的。我想对两个不同数据框中两列的每个条目进行比较。数据框如下所示:
>>> subject_df.show()
+------+-------------+
|USERID| FULLNAME|
+------+-------------+
| 12345| steve james|
| 12346| steven smith|
| 43212|bill dunnigan|
+------+-------------+
>>> target_df.show()
+------+-------------+
|USERID| FULLNAME|
+------+-------------+
|111123| steve tyler|
|422226| linda smith|
|123333|bill dunnigan|
| 56453| steve smith|
+------+-------------+
这是我尝试使用的逻辑:
# CREATE FUNCTION
def string_match(subject, targets):
for target in targets:
<logic>
return logic_result
# CREATE UDF
string_match_udf = udf(string_match, IntegerType())
# APPLY UDF
subject_df.select(subject_df.FULLNAME, string_match_udf(subject_df.FULLNAME, target_df.FULLNAME).alias("score"))
这是我在 运行 pyspark shell 中的代码时得到的错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o45.select.
: java.lang.RuntimeException: Invalid PythonUDF PythonUDF#string_match(FULLNAME#2,FULLNAME#5), requires attributes from more than one child.
我认为问题的根源在于试图将第二列传递给函数。我应该使用 RDD 吗?请记住,实际的 subject_df 和 target_df 都超过 100,000 行。我愿意接受任何建议。
您似乎误解了用户定义函数的工作原理:
- 函数当时只接收一行的值
- 您不能使用来自不相关
DataFame
的数据。
做你想做的唯一方法是取笛卡尔积。
subject_df.join(target_df).select(
f(subject_df.FULLNAME, target_df.FULLNAME)
)
其中f
是一个比较当时两个元素的函数