使用 Pyspark 合并两个数据帧
Merging two dataframes using Pyspark
我有2个DF要合并:
DF1 --> 包含股票
Plant Art_nr Tot
A X 5
B Y 4
DF2 --Z 包含未结交货
Plant Art_nr Tot
A X 1
C Z 3
我想获得一个 DF3,其中对于 Plant 和 Art_nr 的每个组合:
- 如果 DF1.Plant&Art_nr 和 DF2.Plant&Art_nr 之间存在匹配项,我会得到 DF1 和 DF2 之间的区别
- 如果 DF1.Plant&Art_nr 和 DF2.Plant&Art_nr 之间没有匹配项,我将保留 DF1 和 DF2
的原始值
DF3 -->
Plant Art_nr Total
A X 4
B Y 4
C Z 3
我在 DF1 和 DF2 中创建了一个 "Concat" 字段来连接 Plant 和 Art_nr 我尝试使用完整连接 + when + otherwise 但我找不到正确的语法
DF1.join(DF2, ["Concat"],"full").withColumn("Total",when(DF1.Concat.isin(DF2.Concat)), DF1.Tot - DF2.Tot).otherwise(when(not(DF1.Concat.isin(DF2.Concat)), DF1.Tot)).show()
关于我可以使用的替代功能或如何正确使用这些功能有什么建议吗?
使用 Udf,看似冗长但更清晰
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, array
def score(arr):
if arr[0] is None:
return int(arr[1])
elif arr[1] is None:
return int(arr[0])
return (int(arr[0])-int(arr[1]))
udf_final = udf(lambda arr: score(arr), IntegerType())
DF1.join(DF2, cond, "full").withColumn("final_score",udf_final(array("Tot","Total")))
您必须连接两个数据框,然后执行 case (If-Else)
表达式或 coalesce
函数。
这可以通过多种方式完成,这里有几个例子。
选项 1: 使用 coalesce
函数替代 CASE-WHEN-NULL
from pyspark.sql.functions import coalesce, lit,abs
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]
df1.join(df2,cond,'full') \
.select(coalesce(df1.Plant,df2.Plant).alias('Plant')
,coalesce(df1.Art_nr,df2.Art_nr).alias('Art_nr')
,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
).show()
选项 2: 在 selectExpr()
中使用 case
表达式
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]
df1.alias('a').join(df2.alias('b'),cond,'full') \
.selectExpr("CASE WHEN a.Plant IS NULL THEN b.Plant ELSE a.Plant END AS Plant",
"CASE WHEN a.Art_nr IS NULL THEN b.Art_nr ELSE a.Art_nr END AS Art_nr",
"abs(coalesce(a.Tot,0) - coalesce(b.Tot,0)) AS Tot") \
.show()
#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#| A| X| 4|
#| B| Y| 4|
#| C| Z| 3|
#+-----+------+---+
选项 3: 使用 when().otherwise()
from pyspark.sql.functions import when,coalesce, lit,abs
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]
df1.join(df2,cond,'full') \
.select(when(df1.Plant.isNull(),df2.Plant).otherwise(df1.Plant).alias('Plant')
,when(df1.Art_nr.isNull(),df2.Art_nr).otherwise(df1.Art_nr).alias('Art_nr')
,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
).show()
输出:
#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#| A| X| 4|
#| B| Y| 4|
#| C| Z| 3|
#+-----+------+---+
我可能会使用 groupBy 进行联合并进行一些重新格式化以避免使用 UDF 并且没有大块代码。
from pyspark.sql.functions import *
DF3 = DF1.union(DF2.withColumn("Tot", col("Tot") * (-1)))
DF3 = DF3.groupBy("Plant", "Art_nr").agg(sum("Tot").alias("Tot"))
DF3 = DF3.withColumn("Tot", abs(col("Tot")))
我不能 100% 确定是否没有我没有考虑到的副作用以及它是否符合您的需求。
我有2个DF要合并:
DF1 --> 包含股票
Plant Art_nr Tot
A X 5
B Y 4
DF2 --Z 包含未结交货
Plant Art_nr Tot
A X 1
C Z 3
我想获得一个 DF3,其中对于 Plant 和 Art_nr 的每个组合: - 如果 DF1.Plant&Art_nr 和 DF2.Plant&Art_nr 之间存在匹配项,我会得到 DF1 和 DF2 之间的区别 - 如果 DF1.Plant&Art_nr 和 DF2.Plant&Art_nr 之间没有匹配项,我将保留 DF1 和 DF2
的原始值DF3 -->
Plant Art_nr Total
A X 4
B Y 4
C Z 3
我在 DF1 和 DF2 中创建了一个 "Concat" 字段来连接 Plant 和 Art_nr 我尝试使用完整连接 + when + otherwise 但我找不到正确的语法
DF1.join(DF2, ["Concat"],"full").withColumn("Total",when(DF1.Concat.isin(DF2.Concat)), DF1.Tot - DF2.Tot).otherwise(when(not(DF1.Concat.isin(DF2.Concat)), DF1.Tot)).show()
关于我可以使用的替代功能或如何正确使用这些功能有什么建议吗?
使用 Udf,看似冗长但更清晰
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, array
def score(arr):
if arr[0] is None:
return int(arr[1])
elif arr[1] is None:
return int(arr[0])
return (int(arr[0])-int(arr[1]))
udf_final = udf(lambda arr: score(arr), IntegerType())
DF1.join(DF2, cond, "full").withColumn("final_score",udf_final(array("Tot","Total")))
您必须连接两个数据框,然后执行 case (If-Else)
表达式或 coalesce
函数。
这可以通过多种方式完成,这里有几个例子。
选项 1: 使用 coalesce
函数替代 CASE-WHEN-NULL
from pyspark.sql.functions import coalesce, lit,abs
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]
df1.join(df2,cond,'full') \
.select(coalesce(df1.Plant,df2.Plant).alias('Plant')
,coalesce(df1.Art_nr,df2.Art_nr).alias('Art_nr')
,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
).show()
选项 2: 在 selectExpr()
case
表达式
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]
df1.alias('a').join(df2.alias('b'),cond,'full') \
.selectExpr("CASE WHEN a.Plant IS NULL THEN b.Plant ELSE a.Plant END AS Plant",
"CASE WHEN a.Art_nr IS NULL THEN b.Art_nr ELSE a.Art_nr END AS Art_nr",
"abs(coalesce(a.Tot,0) - coalesce(b.Tot,0)) AS Tot") \
.show()
#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#| A| X| 4|
#| B| Y| 4|
#| C| Z| 3|
#+-----+------+---+
选项 3: 使用 when().otherwise()
from pyspark.sql.functions import when,coalesce, lit,abs
cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]
df1.join(df2,cond,'full') \
.select(when(df1.Plant.isNull(),df2.Plant).otherwise(df1.Plant).alias('Plant')
,when(df1.Art_nr.isNull(),df2.Art_nr).otherwise(df1.Art_nr).alias('Art_nr')
,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
).show()
输出:
#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#| A| X| 4|
#| B| Y| 4|
#| C| Z| 3|
#+-----+------+---+
我可能会使用 groupBy 进行联合并进行一些重新格式化以避免使用 UDF 并且没有大块代码。
from pyspark.sql.functions import *
DF3 = DF1.union(DF2.withColumn("Tot", col("Tot") * (-1)))
DF3 = DF3.groupBy("Plant", "Art_nr").agg(sum("Tot").alias("Tot"))
DF3 = DF3.withColumn("Tot", abs(col("Tot")))
我不能 100% 确定是否没有我没有考虑到的副作用以及它是否符合您的需求。