用 PySpark 中的其他 Dataframe 替换空值
Replace null values with other Dataframe in PySpark
我有一些产品数据 (DF),但有些没有描述。我有一个 excel 文件,其中包含一些(加载为地图)的描述。现在我想用 Map 的值填充 DF 中的缺失值,并且已经有描述的行使用 Pyspark 保持它们不变。
DF
Id | Desc
01 | 'desc1'
02 | null
03 | 'desc3'
04 | null
Map
Key | Value
2 | 'desc2'
4 | 'desc4'
Output
Id | Desc
1 | 'desc1'
2 | 'desc2'
3 | 'desc3'
4 | 'desc4'
提前致谢
您需要确保 DF.Id
字段和 Map.Key
字段相同 type/values(目前,它们看起来不像前导 0
),然后进行左连接,然后 select 所需的列与 coalesce()
。我的pySpark有点生疏,所以我会在scala中提供解决方案。道理应该是一样的。
val df = Seq(
(1, "desc1"),
(2, null),
(3, "desc3"),
(4, null)
).toDF("Id", "Desc")
val map = Seq(
(2, "desc2"),
(4, "desc4")
).toDF("Key", "Value")
df.show()
map.show()
df.join(map, df("Id") === map("Key"), "left")
.select(
df("Id"),
coalesce(df("Desc"), $"Value").as("Desc")
)
.show()
产量:
+---+-----+
| Id| Desc|
+---+-----+
| 1|desc1|
| 2| null|
| 3|desc3|
| 4| null|
+---+-----+
+---+-----+
|Key|Value|
+---+-----+
| 2|desc2|
| 4|desc4|
+---+-----+
+---+-----+
| Id| Desc|
+---+-----+
| 1|desc1|
| 2|desc2|
| 3|desc3|
| 4|desc4|
+---+-----+
在 PySpark 中,借助 UDF:
schema = StructType([StructField("Index", IntegerType(), True),
StructField("Desc", StringType(), True)])
DF = sc.parallelize([(1, "desc1"), (2,None), (3,"desc3"), (4, None)]).toDF(schema)
myMap = {
2: "desc2",
4 : "desc4"
}
myMapBroadcasted = sc.broadcast(myMap)
@udf(StringType())
def fillNone(Index, Desc):
if Desc is None:
if Index in myMapBroadcasted.value:
return myMapBroadcasted.value[Index]
return Desc
DF.withColumn('Desc', fillNone(col('Index'), col('Desc'))).show()
很难知道您提供的数据集的 cardinality...这可能会如何改变这里的解决方案的一些例子是:
- 如果 "DF" 和 "Map" 有重叠的描述...我们应该如何优先考虑哪个 table 有 "right" 描述?
- 您要创建的最终数据框是否需要完全包含 ID 或描述列表?这些数据帧中的任何一个都有完整列表吗?这也可能会改变解决方案。
我做了一些假设,以便您自己确定什么是正确的方法:
- 我假设 "DF" 包含整个 ID 列表
- 我假设 "Map" 只有 ID 的一个子集,并不完全包含 "DF"
中存在的更广泛的 ID 集
我在这里使用 PySpark:
DF = DF.na.drop() # we'll eliminate the missing values from the parent dataframe
DF_Output = DF.join(Map, on = "ID", how = 'outer')
我有一些产品数据 (DF),但有些没有描述。我有一个 excel 文件,其中包含一些(加载为地图)的描述。现在我想用 Map 的值填充 DF 中的缺失值,并且已经有描述的行使用 Pyspark 保持它们不变。
DF
Id | Desc
01 | 'desc1'
02 | null
03 | 'desc3'
04 | null
Map
Key | Value
2 | 'desc2'
4 | 'desc4'
Output
Id | Desc
1 | 'desc1'
2 | 'desc2'
3 | 'desc3'
4 | 'desc4'
提前致谢
您需要确保 DF.Id
字段和 Map.Key
字段相同 type/values(目前,它们看起来不像前导 0
),然后进行左连接,然后 select 所需的列与 coalesce()
。我的pySpark有点生疏,所以我会在scala中提供解决方案。道理应该是一样的。
val df = Seq(
(1, "desc1"),
(2, null),
(3, "desc3"),
(4, null)
).toDF("Id", "Desc")
val map = Seq(
(2, "desc2"),
(4, "desc4")
).toDF("Key", "Value")
df.show()
map.show()
df.join(map, df("Id") === map("Key"), "left")
.select(
df("Id"),
coalesce(df("Desc"), $"Value").as("Desc")
)
.show()
产量:
+---+-----+
| Id| Desc|
+---+-----+
| 1|desc1|
| 2| null|
| 3|desc3|
| 4| null|
+---+-----+
+---+-----+
|Key|Value|
+---+-----+
| 2|desc2|
| 4|desc4|
+---+-----+
+---+-----+
| Id| Desc|
+---+-----+
| 1|desc1|
| 2|desc2|
| 3|desc3|
| 4|desc4|
+---+-----+
在 PySpark 中,借助 UDF:
schema = StructType([StructField("Index", IntegerType(), True),
StructField("Desc", StringType(), True)])
DF = sc.parallelize([(1, "desc1"), (2,None), (3,"desc3"), (4, None)]).toDF(schema)
myMap = {
2: "desc2",
4 : "desc4"
}
myMapBroadcasted = sc.broadcast(myMap)
@udf(StringType())
def fillNone(Index, Desc):
if Desc is None:
if Index in myMapBroadcasted.value:
return myMapBroadcasted.value[Index]
return Desc
DF.withColumn('Desc', fillNone(col('Index'), col('Desc'))).show()
很难知道您提供的数据集的 cardinality...这可能会如何改变这里的解决方案的一些例子是:
- 如果 "DF" 和 "Map" 有重叠的描述...我们应该如何优先考虑哪个 table 有 "right" 描述?
- 您要创建的最终数据框是否需要完全包含 ID 或描述列表?这些数据帧中的任何一个都有完整列表吗?这也可能会改变解决方案。
我做了一些假设,以便您自己确定什么是正确的方法:
- 我假设 "DF" 包含整个 ID 列表
- 我假设 "Map" 只有 ID 的一个子集,并不完全包含 "DF" 中存在的更广泛的 ID 集
我在这里使用 PySpark:
DF = DF.na.drop() # we'll eliminate the missing values from the parent dataframe
DF_Output = DF.join(Map, on = "ID", how = 'outer')