加入两个数据帧并存储在新数据帧中
Joining two data frames and storing in new data frame
我有两个 Spark 数据帧。
数据框 A:
Col_A1 Col_A2
1 ["x", "y", "z"]
2 ["a", "x", "y"]
3 ["a", "b", "c"]
数据框 B:
Col_B1
"x"
"a"
"y"
我想检查数据帧 A 的哪些条目在其 Col_A2 中具有数据帧 B 的 "x"
并且它 return 它本身作为新数据帧。我想对数据框 B 的其余条目重复执行相同的操作。
输出需要类似于:
数据帧A_x:
Col_A1 Col_A2
1 ["x", "y", "z"]
2 ["a", "x", "y"]
数据帧A_a:
Col_A1 Col_A2
2 ["a", "x", "y"]
3 ["a", "b", "c"]
数据框A_y
Col_A1 Col_A2
1 ["x", "y", "z"]
2 ["a", "x", "y"]
我尝试使用 udfs 和 map 函数,但并没有真正得到我想要的东西。
提前致谢。
如果你的数据框B很小,可以收集到一个列表中,加上它的不同值的数量是小,您可以为其每个元素编写一个简单的 UDF [更新:请参阅 post 的结尾以获取更简单的方法];这是 'x'
:
的示例
spark.version
# u'2.2.0'
from pyspark.sql import Row
df_a = spark.createDataFrame([Row(1, ["x", "y", "z"]),
Row(2, ["a", "x", "y"]),
Row(3, ["a", "b", "c"])],
["col_A1", "col_A2"])
@udf('boolean')
def x_isin(v):
if 'x' in v:
return True
else:
return False
temp_x = df_a.withColumn('x_isin', x_isin(df_a.col_A2))
temp_x.show()
# +------+---------+------+
# |col_A1| col_A2|x_isin|
# +------+---------+------+
# | 1|[x, y, z]| true|
# | 2|[a, x, y]| true|
# | 3|[a, b, c]| false|
# +------+---------+------+
df_a_x = temp_x.filter(temp_x.x_isin==True).drop('x_isin')
df_a_x.show()
# +------+---------+
# |col_A1| col_A2|
# +------+---------+
# | 1|[x, y, z]|
# | 2|[a, x, y]|
# +------+---------+
更新(在玛丽发表评论后):
感谢 Marie 指出 array_contains
函数 - 现在您确实不需要 UDF 来构建 temp_x
:
import pyspark.sql.functions as func
temp_x = df_a.withColumn('x_isin', func.array_contains(df_a.col_A2, 'x'))
temp_x.show() # same result as shown above
我有两个 Spark 数据帧。
数据框 A:
Col_A1 Col_A2
1 ["x", "y", "z"]
2 ["a", "x", "y"]
3 ["a", "b", "c"]
数据框 B:
Col_B1
"x"
"a"
"y"
我想检查数据帧 A 的哪些条目在其 Col_A2 中具有数据帧 B 的 "x"
并且它 return 它本身作为新数据帧。我想对数据框 B 的其余条目重复执行相同的操作。
输出需要类似于:
数据帧A_x:
Col_A1 Col_A2
1 ["x", "y", "z"]
2 ["a", "x", "y"]
数据帧A_a:
Col_A1 Col_A2
2 ["a", "x", "y"]
3 ["a", "b", "c"]
数据框A_y
Col_A1 Col_A2
1 ["x", "y", "z"]
2 ["a", "x", "y"]
我尝试使用 udfs 和 map 函数,但并没有真正得到我想要的东西。 提前致谢。
如果你的数据框B很小,可以收集到一个列表中,加上它的不同值的数量是小,您可以为其每个元素编写一个简单的 UDF [更新:请参阅 post 的结尾以获取更简单的方法];这是 'x'
:
spark.version
# u'2.2.0'
from pyspark.sql import Row
df_a = spark.createDataFrame([Row(1, ["x", "y", "z"]),
Row(2, ["a", "x", "y"]),
Row(3, ["a", "b", "c"])],
["col_A1", "col_A2"])
@udf('boolean')
def x_isin(v):
if 'x' in v:
return True
else:
return False
temp_x = df_a.withColumn('x_isin', x_isin(df_a.col_A2))
temp_x.show()
# +------+---------+------+
# |col_A1| col_A2|x_isin|
# +------+---------+------+
# | 1|[x, y, z]| true|
# | 2|[a, x, y]| true|
# | 3|[a, b, c]| false|
# +------+---------+------+
df_a_x = temp_x.filter(temp_x.x_isin==True).drop('x_isin')
df_a_x.show()
# +------+---------+
# |col_A1| col_A2|
# +------+---------+
# | 1|[x, y, z]|
# | 2|[a, x, y]|
# +------+---------+
更新(在玛丽发表评论后):
感谢 Marie 指出 array_contains
函数 - 现在您确实不需要 UDF 来构建 temp_x
:
import pyspark.sql.functions as func
temp_x = df_a.withColumn('x_isin', func.array_contains(df_a.col_A2, 'x'))
temp_x.show() # same result as shown above