如何连接不同 spark 数据帧的两个数组/列表列?
How to concat two array / list columns of different spark dataframes?
需要一个连接数据框。
来自两个不同 spark 数据框的列。
寻找 pyspark 代码。
df1.show()
+---------+
| value|
+---------+
|[1, 2, 3]|
+---------+
df2.show()
+------+
| value|
+------+
|[4, 5]|
+------+
I need a dataframe as bleow:
+------------+
| value |
+------------+
|[1,2,3,4,5] |
+------------+
这里还有一些教育方面的内容,您可以去掉 .show(),先生成一些数据。
Spark 2.4 assumed.
Positional dependency is OK although some dispute if it is preserved with RDDs and such with just zipWithIndex; I have no evidence to doubt that.
No performance considerations in terms of explicit partitioning, but no UDFs used.
Assuming same number of rows in both DFs. DataSet not a pyspark object.
Need rdd conversion.
import pyspark.sql.functions as f
from pyspark.sql.functions import col, concat
df1 = spark.createDataFrame([ list([[x,x+1,x+2]]) for x in range(7)], ['value'])
df2 = spark.createDataFrame([ list([[x+10,x+20]]) for x in range(7)], ['value'])
dfA = df1.rdd.map(lambda r: r.value).zipWithIndex().toDF(['value', 'index'])
dfB = df2.rdd.map(lambda r: r.value).zipWithIndex().toDF(['value', 'index'])
df_inner_join = dfA.join(dfB, dfA.index == dfB.index)
new_names = ['value1', 'index1', 'value2', 'index2']
df_renamed = df_inner_join.toDF(*new_names) # Issues with column renames otherwise!
df_result = df_renamed.select(col("index1"), concat(col("value1"), col("value2")))
new_names_final = ['index', 'value']
df_result_final = df_result.toDF(*new_names_final)
数据输入(生成)
+---------+
| value|
+---------+
|[0, 1, 2]|
|[1, 2, 3]|
|[2, 3, 4]|
|[3, 4, 5]|
|[4, 5, 6]|
|[5, 6, 7]|
|[6, 7, 8]|
+---------+
+--------+
| value|
+--------+
|[10, 20]|
|[11, 21]|
|[12, 22]|
|[13, 23]|
|[14, 24]|
|[15, 25]|
|[16, 26]|
+--------+
数据输出
+-----+-----------------+
|index| value|
+-----+-----------------+
| 0|[0, 1, 2, 10, 20]|
| 6|[6, 7, 8, 16, 26]|
| 5|[5, 6, 7, 15, 25]|
| 1|[1, 2, 3, 11, 21]|
| 3|[3, 4, 5, 13, 23]|
| 2|[2, 3, 4, 12, 22]|
| 4|[4, 5, 6, 14, 24]|
+-----+-----------------+
需要一个连接数据框。 来自两个不同 spark 数据框的列。 寻找 pyspark 代码。
df1.show()
+---------+
| value|
+---------+
|[1, 2, 3]|
+---------+
df2.show()
+------+
| value|
+------+
|[4, 5]|
+------+
I need a dataframe as bleow:
+------------+
| value |
+------------+
|[1,2,3,4,5] |
+------------+
这里还有一些教育方面的内容,您可以去掉 .show(),先生成一些数据。
Spark 2.4 assumed. Positional dependency is OK although some dispute if it is preserved with RDDs and such with just zipWithIndex; I have no evidence to doubt that. No performance considerations in terms of explicit partitioning, but no UDFs used. Assuming same number of rows in both DFs. DataSet not a pyspark object. Need rdd conversion.
import pyspark.sql.functions as f
from pyspark.sql.functions import col, concat
df1 = spark.createDataFrame([ list([[x,x+1,x+2]]) for x in range(7)], ['value'])
df2 = spark.createDataFrame([ list([[x+10,x+20]]) for x in range(7)], ['value'])
dfA = df1.rdd.map(lambda r: r.value).zipWithIndex().toDF(['value', 'index'])
dfB = df2.rdd.map(lambda r: r.value).zipWithIndex().toDF(['value', 'index'])
df_inner_join = dfA.join(dfB, dfA.index == dfB.index)
new_names = ['value1', 'index1', 'value2', 'index2']
df_renamed = df_inner_join.toDF(*new_names) # Issues with column renames otherwise!
df_result = df_renamed.select(col("index1"), concat(col("value1"), col("value2")))
new_names_final = ['index', 'value']
df_result_final = df_result.toDF(*new_names_final)
数据输入(生成)
+---------+
| value|
+---------+
|[0, 1, 2]|
|[1, 2, 3]|
|[2, 3, 4]|
|[3, 4, 5]|
|[4, 5, 6]|
|[5, 6, 7]|
|[6, 7, 8]|
+---------+
+--------+
| value|
+--------+
|[10, 20]|
|[11, 21]|
|[12, 22]|
|[13, 23]|
|[14, 24]|
|[15, 25]|
|[16, 26]|
+--------+
数据输出
+-----+-----------------+
|index| value|
+-----+-----------------+
| 0|[0, 1, 2, 10, 20]|
| 6|[6, 7, 8, 16, 26]|
| 5|[5, 6, 7, 15, 25]|
| 1|[1, 2, 3, 11, 21]|
| 3|[3, 4, 5, 13, 23]|
| 2|[2, 3, 4, 12, 22]|
| 4|[4, 5, 6, 14, 24]|
+-----+-----------------+