加入行数不等的 PySpark 数据帧

Join PySpark dataframes with unequal numbers of rows

我有两个 PySpark 数据框,如下所示

首先是 df1,如下所示:

+-----+-----+----------+-----+
| name| type|timestamp1|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10|   11|
|name2|type1|2012-01-10|   14|
|name3|type2|2012-01-10|    2|
|name3|type2|2012-01-17|    3|
|name1|type1|2012-01-18|   55|
|name1|type1|2012-01-19|   10|
+-----+-----+----------+-----+

第二个是 df2,如下所示:

+-----+-------------------+-------+-------+
| name|         timestamp2|string1|string2|
+-----+-------------------+-------+-------+
|name1|2012-01-10 00:00:00|      A|     aa|
|name2|2012-01-10 00:00:00|      A|     bb|
|name3|2012-01-10 00:00:00|      C|     cc|
|name4|2012-01-17 00:00:00|      D|     dd|
|name3|2012-01-10 00:00:00|      C|     cc|
|name2|2012-01-17 00:00:00|      A|     bb|
|name2|2012-01-17 00:00:00|      A|     bb|
|name4|2012-01-10 00:00:00|      D|     dd|
|name3|2012-01-17 00:00:00|      C|     cc|
+-----+-------------------+-------+-------+

这两个数据框有一个公共列, namedf2name 的每个唯一值都有 string1string2.

的唯一值

我想加入 df1df2 并形成一个新的数据框 df3 这样 df3 包含 df1 的所有行(相同的结构, 行数为 df1) 但将 string1string2 列(来自 df2)的值分配给 df1name 的适当值.以下是我希望组合数据框 (df3) 的样子。

+-----+-----+----------+-----+-------+-------+
| name| type|timestamp1|score|string1|string2|
+-----+-----+----------+-----+-------+-------+
|name1|type1|2012-01-10|   11|      A|     aa|
|name2|type1|2012-01-10|   14|      A|     bb|
|name3|type2|2012-01-10|    2|      C|     cc|
|name3|type2|2012-01-17|    3|      C|     cc|
|name1|type1|2012-01-18|   55|      A|     aa|
|name1|type1|2012-01-19|   10|      A|     aa|
+-----+-----+----------+-----+-------+-------+

我怎样才能得到上面提到的数据帧(df3)?

我尝试了以下 df3 = df1.join( df2.select("name", "string1", "string2") , on=["name"], how="left")。但这给了我一个包含 14 行的数据框,其中包含多个(重复的)行条目。

您可以使用下面提到的代码生成 df1df2

from pyspark.sql import *
import pyspark.sql.functions as F

df1_Stats = Row("name", "type", "timestamp1", "score")

df1_stat1 = df1_Stats('name1', 'type1', "2012-01-10", 11)
df1_stat2 = df1_Stats('name2', 'type1', "2012-01-10", 14)
df1_stat3 = df1_Stats('name3', 'type2', "2012-01-10", 2)
df1_stat4 = df1_Stats('name3', 'type2', "2012-01-17", 3)
df1_stat5 = df1_Stats('name1', 'type1', "2012-01-18", 55)
df1_stat6 = df1_Stats('name1', 'type1', "2012-01-19", 10)

df1_stat_lst = [df1_stat1 , df1_stat2, df1_stat3, df1_stat4, df1_stat5, df1_stat6]

df1 = spark.createDataFrame(df1_stat_lst)

df2_Stats = Row("name", "timestamp2", "string1", "string2")

df2_stat1 = df2_Stats("name1", "2012-01-10 00:00:00", "A", "aa")
df2_stat2 = df2_Stats("name2", "2012-01-10 00:00:00", "A", "bb")
df2_stat3 = df2_Stats("name3", "2012-01-10 00:00:00", "C", "cc")
df2_stat4 = df2_Stats("name4", "2012-01-17 00:00:00", "D", "dd")
df2_stat5 = df2_Stats("name3", "2012-01-10 00:00:00", "C", "cc")
df2_stat6 = df2_Stats("name2", "2012-01-17 00:00:00", "A", "bb")
df2_stat7 = df2_Stats("name2", "2012-01-17 00:00:00", "A", "bb")
df2_stat8 = df2_Stats("name4", "2012-01-10 00:00:00", "D", "dd")
df2_stat9 = df2_Stats("name3", "2012-01-17 00:00:00", "C", "cc")

df2_stat_lst = [
    df2_stat1,
    df2_stat2,
    df2_stat3,
    df2_stat4,
    df2_stat5,
    df2_stat6,
    df2_stat7,
    df2_stat8,
    df2_stat9,
]

df2 = spark.createDataFrame(df2_stat_lst)

显然可以使用以下技术:

df3 = df1.join(
    df2.select("name", "string1", "string2"), on=["name"], how="left"
).dropDuplicates()
df3.show()

+-----+-----+----------+-----+-------+-------+
| name| type| timestamp|score|string1|string2|
+-----+-----+----------+-----+-------+-------+
|name2|type1|2012-01-10|   14|      A|     bb|
|name3|type2|2012-01-10|    2|      C|     cc|
|name1|type1|2012-01-18|   55|      A|     aa|
|name1|type1|2012-01-10|   11|      A|     aa|
|name3|type2|2012-01-17|    3|      C|     cc|
|name1|type1|2012-01-19|   10|      A|     aa|
+-----+-----+----------+-----+-------+-------+

我仍然愿意回答。所以,如果您有更有效的回答问题的方法,请随时留下您的答案。

最好在加入之前删除重复项,使小 table 加入。

 df3 = df1.join(df2.select("name", "string1", "string2").distinct(),on=["name"] , how="left")