在 PySpark 中合并两个数据框
Merge two dataframes in PySpark
我有两个数据帧,DF1 和 DF2,DF1 是主数据帧,它存储来自 DF2 的任何附加信息。
假设 DF1 具有以下格式,
Item Id | item | count
---------------------------
1 | item 1 | 2
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 5
DF2 包含 DF1 中已有的 2 个项目和两个新条目。 (itemId和item被认为是一个组,可以作为join的key)
Item Id | item | count
---------------------------
1 | item 1 | 2
3 | item 4 | 2
4 | item 4 | 4
5 | item 5 | 2
我需要合并这两个数据框,以便增加现有项目计数并插入新项目。
结果应该是这样的:
Item Id | item | count
---------------------------
1 | item 1 | 4
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 7
4 | item 4 | 4
5 | item 5 | 2
我有一种方法可以做到这一点,但不确定它是否有效或正确的方法
temp1 = df1.join(temp,['item_id','item'],'full_outer') \
.na.fill(0)
temp1\
.groupby("item_id", "item")\
.agg(F.sum(temp1["count"] + temp1["newcount"]))\
.show()
有几种方法可以做到这一点。
根据您的描述,最直接的解决方案是使用 RDD - SparkContext.union
:
rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)
union_rdd = sc.union([rdd1, rdd2])
替代解决方案是使用 pyspark.sql
中的 DataFrame.union
注意:我之前建议过unionAll
,但在 Spark 2.0 中已弃用
因为这两个数据帧的架构相同,您可以执行 union
然后执行 groupby
id 和 aggregate
计数。
step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));
推荐@wandermonk 的解决方案,因为它不使用连接。尽可能避免连接,因为这会触发洗牌(也称为广泛转换并导致通过网络传输数据,这既昂贵又缓慢)
您还必须查看您的数据大小(两个表都很大或一大一小等),因此您可以调整它的性能方面。
我尝试通过使用 SparkSQL 的解决方案向小组展示,因为他们做同样的事情但更容易理解和操作。
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]
my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)
df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")
df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)
现在,如果您查看 SparkUI,您可以看到对于如此小的数据集、洗牌操作和阶段数。
这么小的工作的阶段数
通过命令对本组的随机操作进行编号
我还建议查看 SQL 计划并了解费用。交换代表这里的洗牌。
== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
+- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
+- Union
:- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
+- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]
我有两个数据帧,DF1 和 DF2,DF1 是主数据帧,它存储来自 DF2 的任何附加信息。
假设 DF1 具有以下格式,
Item Id | item | count
---------------------------
1 | item 1 | 2
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 5
DF2 包含 DF1 中已有的 2 个项目和两个新条目。 (itemId和item被认为是一个组,可以作为join的key)
Item Id | item | count
---------------------------
1 | item 1 | 2
3 | item 4 | 2
4 | item 4 | 4
5 | item 5 | 2
我需要合并这两个数据框,以便增加现有项目计数并插入新项目。
结果应该是这样的:
Item Id | item | count
---------------------------
1 | item 1 | 4
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 7
4 | item 4 | 4
5 | item 5 | 2
我有一种方法可以做到这一点,但不确定它是否有效或正确的方法
temp1 = df1.join(temp,['item_id','item'],'full_outer') \
.na.fill(0)
temp1\
.groupby("item_id", "item")\
.agg(F.sum(temp1["count"] + temp1["newcount"]))\
.show()
有几种方法可以做到这一点。
根据您的描述,最直接的解决方案是使用 RDD - SparkContext.union
:
rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)
union_rdd = sc.union([rdd1, rdd2])
替代解决方案是使用 pyspark.sql
DataFrame.union
注意:我之前建议过unionAll
,但在 Spark 2.0 中已弃用
因为这两个数据帧的架构相同,您可以执行 union
然后执行 groupby
id 和 aggregate
计数。
step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));
推荐@wandermonk 的解决方案,因为它不使用连接。尽可能避免连接,因为这会触发洗牌(也称为广泛转换并导致通过网络传输数据,这既昂贵又缓慢)
您还必须查看您的数据大小(两个表都很大或一大一小等),因此您可以调整它的性能方面。
我尝试通过使用 SparkSQL 的解决方案向小组展示,因为他们做同样的事情但更容易理解和操作。
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]
my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)
df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")
df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)
现在,如果您查看 SparkUI,您可以看到对于如此小的数据集、洗牌操作和阶段数。
这么小的工作的阶段数
通过命令对本组的随机操作进行编号
我还建议查看 SQL 计划并了解费用。交换代表这里的洗牌。
== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
+- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
+- Union
:- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
+- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]