使用 Pyspark 合并两个数据帧

Merging two dataframes using Pyspark

我有2个DF要合并:

DF1 --> 包含股票

Plant   Art_nr    Tot
A        X         5
B        Y         4

DF2 --Z 包含未结交货

Plant    Art_nr   Tot
A        X        1
C        Z        3

我想获得一个 DF3,其中对于 Plant 和 Art_nr 的每个组合: - 如果 DF1.Plant&Art_nr 和 DF2.Plant&Art_nr 之间存在匹配项,我会得到 DF1 和 DF2 之间的区别 - 如果 DF1.Plant&Art_nr 和 DF2.Plant&Art_nr 之间没有匹配项,我将保留 DF1 和 DF2

的原始值

DF3 -->

Plant    Art_nr   Total
A        X        4
B        Y        4
C        Z        3

我在 DF1 和 DF2 中创建了一个 "Concat" 字段来连接 Plant 和 Art_nr 我尝试使用完整连接 + when + otherwise 但我找不到正确的语法

DF1.join(DF2, ["Concat"],"full").withColumn("Total",when(DF1.Concat.isin(DF2.Concat)), DF1.Tot - DF2.Tot).otherwise(when(not(DF1.Concat.isin(DF2.Concat)), DF1.Tot)).show()

关于我可以使用的替代功能或如何正确使用这些功能有什么建议吗?

使用 Udf,看似冗长但更清晰

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, array

def score(arr):
    if arr[0] is None:
        return int(arr[1])
    elif arr[1] is None:
        return int(arr[0])
    return (int(arr[0])-int(arr[1]))

udf_final = udf(lambda arr: score(arr), IntegerType())

DF1.join(DF2, cond, "full").withColumn("final_score",udf_final(array("Tot","Total")))

您必须连接两个数据框,然后执行 case (If-Else) 表达式或 coalesce 函数。

这可以通过多种方式完成,这里有几个例子。

选项 1: 使用 coalesce 函数替代 CASE-WHEN-NULL

from pyspark.sql.functions import coalesce, lit,abs

cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]

df1.join(df2,cond,'full')  \
.select(coalesce(df1.Plant,df2.Plant).alias('Plant')
       ,coalesce(df1.Art_nr,df2.Art_nr).alias('Art_nr')
       ,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
       ).show()

选项 2:selectExpr()

中使用 case 表达式
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]

df1.alias('a').join(df2.alias('b'),cond,'full')  \
.selectExpr("CASE WHEN a.Plant IS NULL THEN b.Plant ELSE a.Plant END AS Plant",
            "CASE WHEN a.Art_nr IS NULL THEN b.Art_nr ELSE a.Art_nr END AS Art_nr",
            "abs(coalesce(a.Tot,0) - coalesce(b.Tot,0))  AS Tot") \
.show()

#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#|    A|     X|  4|
#|    B|     Y|  4|
#|    C|     Z|  3|
#+-----+------+---+

选项 3: 使用 when().otherwise()

from pyspark.sql.functions import when,coalesce, lit,abs

cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]

df1.join(df2,cond,'full')  \
.select(when(df1.Plant.isNull(),df2.Plant).otherwise(df1.Plant).alias('Plant')
       ,when(df1.Art_nr.isNull(),df2.Art_nr).otherwise(df1.Art_nr).alias('Art_nr')
       ,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
       ).show()

输出:

#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#|    A|     X|  4|
#|    B|     Y|  4|
#|    C|     Z|  3|
#+-----+------+---+

我可能会使用 groupBy 进行联合并进行一些重新格式化以避免使用 UDF 并且没有大块代码。

from pyspark.sql.functions import *

DF3 = DF1.union(DF2.withColumn("Tot", col("Tot") * (-1)))
DF3 = DF3.groupBy("Plant", "Art_nr").agg(sum("Tot").alias("Tot"))
DF3 = DF3.withColumn("Tot", abs(col("Tot")))

我不能 100% 确定是否没有我没有考虑到的副作用以及它是否符合您的需求。