合并两个 PySpark DataFrame 会产生意想不到的结果

Merging two PySpark DataFrame's gives unexpected results

我有两个 PySpark DataFrames(不是 pandas):

df1 =

    +----------+--------------+-----------+---------+
    |pk        |num_id        |num_pk     |qty_users|
    +----------+--------------+-----------+---------+
    |  63479840|      12556940|     298620|       13|
    |  63480030|      12557110|     298620|        9|
    |  63835520|      12627890|     299750|        8|

df2 =

    +----------+--------------+-----------+----------+
    |pk2       |num_id2       |num_pk2    |qty_users2|
    +----------+--------------+-----------+----------+
    |  63479800|      11156940|     298620|       10 |
    |  63480030|      12557110|     298620|        1 |
    |  63835520|      12627890|     299750|        2 |

我想加入两个 DataFrame 以获得一个 DataFrame df:

    +----------+--------------+-----------+---------+
    |pk        |num_id        |num_pk     |total    |
    +----------+--------------+-----------+---------+
    |  63479840|      12556940|     298620|       13|
    |  63479800|      11156940|     298620|       10|
    |  63480030|      12557110|     298620|       10|
    |  63835520|      12627890|     299750|       10|

合并的唯一条件是我想对 df1 和 [=17= 中具有相同 < pk, num_id, num_pk > 值的那些行求和 qty_users 的值].就像我在上面的例子中展示的那样。

我该怎么做?

更新:

这是我所做的:

newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer')

newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))

但它给了我 9 列而不是 4 列。如何解决这个问题?

这个输出是你想要的吗?

df3 = pd.concat([df1, df2], as_index=False).groupby(['pk','num_id','num_pk'])['qty_users'].sum()

您的 2 个数据帧的合并是通过 pd.concat([df1, df2], as_index=False)

实现的

当所有其他列都相同时求 qty_users 列的总和首先需要按这些列分组

groupby(['pk','num_id','num_pk'])

然后找到 qty_users

的分组总和
['qty_users'].sum()

外部联接将 return 来自 tables.Also 的所有列,我们必须在 qty_users 中填充空值,因为总和也将 return 为空。

最后,我们可以select使用coalsece函数,

from pyspark.sql import functions as F

newdf = df1.join(df2,(df1.pk==df2.pk2) & (df1.num_pk==df2.num_pk2) & (df1.num_id==df2.num_id2),'outer').fillna(0,subset=["qty_users","qty_users2"])

newdf = newdf.withColumn('total', sum(newdf[col] for col in ["qty_users","qty_users2"]))

newdf.select(*[F.coalesce(c1,c2).alias(c1) for c1,c2 in zip(df1.columns,df2.columns)][:-1]+['total']).show()

+--------+--------+------+-----+
|      pk|  num_id|num_pk|total|
+--------+--------+------+-----+
|63479840|12556940|298620|   13|
|63480030|12557110|298620|   10|
|63835520|12627890|299750|   10|
|63479800|11156940|298620|   10|
+--------+--------+------+-----+

希望对您有所帮助!