用 PySpark 中的其他 Dataframe 替换空值

Replace null values with other Dataframe in PySpark

我有一些产品数据 (DF),但有些没有描述。我有一个 excel 文件,其中包含一些(加载为地图)的描述。现在我想用 Map 的值填充 DF 中的缺失值,并且已经有描述的行使用 Pyspark 保持它们不变。

DF
Id | Desc
01 | 'desc1'
02 | null
03 | 'desc3'
04 | null

Map
Key | Value
2   | 'desc2'
4   | 'desc4'

Output
Id | Desc
1  | 'desc1'
2  | 'desc2'
3  | 'desc3'
4  | 'desc4'

提前致谢

您需要确保 DF.Id 字段和 Map.Key 字段相同 type/values(目前,它们看起来不像前导 0),然后进行左连接,然后 select 所需的列与 coalesce()。我的pySpark有点生疏,所以我会在scala中提供解决方案。道理应该是一样的。

val df = Seq(
    (1, "desc1"),
    (2, null),
    (3, "desc3"),
    (4, null)
).toDF("Id", "Desc")

val map = Seq(
    (2, "desc2"),
    (4, "desc4")
).toDF("Key", "Value")

df.show()
map.show()

df.join(map, df("Id") === map("Key"), "left")
  .select(
      df("Id"),
      coalesce(df("Desc"), $"Value").as("Desc")
      )
  .show()

产量:

+---+-----+
| Id| Desc|
+---+-----+
|  1|desc1|
|  2| null|
|  3|desc3|
|  4| null|
+---+-----+

+---+-----+
|Key|Value|
+---+-----+
|  2|desc2|
|  4|desc4|
+---+-----+

+---+-----+
| Id| Desc|
+---+-----+
|  1|desc1|
|  2|desc2|
|  3|desc3|
|  4|desc4|
+---+-----+

在 PySpark 中,借助 UDF:

schema = StructType([StructField("Index", IntegerType(), True),
                    StructField("Desc", StringType(), True)])

DF = sc.parallelize([(1, "desc1"), (2,None), (3,"desc3"), (4, None)]).toDF(schema)

myMap = {
      2: "desc2",
      4 : "desc4"
    }

myMapBroadcasted = sc.broadcast(myMap)

@udf(StringType())
def fillNone(Index, Desc):
  if Desc is None:
    if Index in myMapBroadcasted.value:
      return myMapBroadcasted.value[Index]
  return Desc

DF.withColumn('Desc', fillNone(col('Index'), col('Desc'))).show()

很难知道您提供的数据集的 cardinality...这可能会如何改变这里的解决方案的一些例子是:

  1. 如果 "DF" 和 "Map" 有重叠的描述...我们应该如何优先考虑哪个 table 有 "right" 描述?
  2. 您要创建的最终数据框是否需要完全包含 ID 或描述列表?这些数据帧中的任何一个都有完整列表吗?这也可能会改变解决方案。

我做了一些假设,以便您自己确定什么是正确的方法:

  • 我假设 "DF" 包含整个 ID 列表
  • 我假设 "Map" 只有 ID 的一个子集,并不完全包含 "DF"
  • 中存在的更广泛的 ID 集

我在这里使用 PySpark:

DF = DF.na.drop() # we'll eliminate the missing values from the parent dataframe
DF_Output = DF.join(Map, on = "ID", how = 'outer')