Spark 增量加载覆盖旧记录
Spark incremental loading overwrite old record
我需要使用 Spark (PySpark)
对 table 进行增量加载
示例如下:
第 1 天
id | value
-----------
1 | abc
2 | def
第 2 天
id | value
-----------
2 | cde
3 | xyz
预期结果
id | value
-----------
1 | abc
2 | cde
3 | xyz
这可以在关系数据库中轻松完成,
想知道这是否可以在 Spark 或其他转换工具中完成,例如急?
数据帧附加由 pyspark 中的 union
函数完成。我将用一个示例进行演示,并按照您在问题中提到的那样创建 2 个数据帧。
from pyspark.sql.types import Row
df1 = sqlContext.createDataFrame([Row(id=1,value="abc"),Row(id=2,value="def")])
df1.show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
df2 = sqlContext.createDataFrame([Row(id=2,value="cde"),Row(id=3,value="xyz")])
df2.show()
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
让我们在两个数据帧之间做一个union
,你会得到想要的结果。
df2.union(df1).dropDuplicates(["id"]).show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
您可以使用 asc
从 pyspark.sql.functions
对输出进行排序
from pyspark.sql.functions import asc
df2.union(df1).dropDuplicates(["id"]).sort(asc("id")).show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| cde|
| 3| xyz|
+---+-----+
解决方法,在dataframe中添加一个日期列,然后根据id进行排名,按日期降序排列,并取rank == 1。它总是会根据id给你最新的记录。
df.("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
.filter($"rank" === 1)
.drop($"rank")
.orderBy($"id")
.show
给你!
第一个数据框:
>>> list1 = [(1, 'abc'),(2,'def')]
>>> olddf = spark.createDataFrame(list1, ['id', 'value'])
>>> olddf.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
第二个数据框:
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
现在使用完全外部连接连接并合并这两个数据名并使用合并功能,同时 select 并且可以用用户定义的值替换空值。
from pyspark.sql.functions import *
>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
希望这能解决您的问题。 :-)
我需要使用 Spark (PySpark)
对 table 进行增量加载示例如下:
第 1 天
id | value
-----------
1 | abc
2 | def
第 2 天
id | value
-----------
2 | cde
3 | xyz
预期结果
id | value
-----------
1 | abc
2 | cde
3 | xyz
这可以在关系数据库中轻松完成,
想知道这是否可以在 Spark 或其他转换工具中完成,例如急?
数据帧附加由 pyspark 中的 union
函数完成。我将用一个示例进行演示,并按照您在问题中提到的那样创建 2 个数据帧。
from pyspark.sql.types import Row
df1 = sqlContext.createDataFrame([Row(id=1,value="abc"),Row(id=2,value="def")])
df1.show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
df2 = sqlContext.createDataFrame([Row(id=2,value="cde"),Row(id=3,value="xyz")])
df2.show()
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
让我们在两个数据帧之间做一个union
,你会得到想要的结果。
df2.union(df1).dropDuplicates(["id"]).show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
您可以使用 asc
从 pyspark.sql.functions
from pyspark.sql.functions import asc
df2.union(df1).dropDuplicates(["id"]).sort(asc("id")).show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| cde|
| 3| xyz|
+---+-----+
解决方法,在dataframe中添加一个日期列,然后根据id进行排名,按日期降序排列,并取rank == 1。它总是会根据id给你最新的记录。
df.("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
.filter($"rank" === 1)
.drop($"rank")
.orderBy($"id")
.show
给你! 第一个数据框:
>>> list1 = [(1, 'abc'),(2,'def')]
>>> olddf = spark.createDataFrame(list1, ['id', 'value'])
>>> olddf.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
第二个数据框:
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
现在使用完全外部连接连接并合并这两个数据名并使用合并功能,同时 select 并且可以用用户定义的值替换空值。
from pyspark.sql.functions import *
>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
希望这能解决您的问题。 :-)