Scala 如何匹配两个 dfs 如果数学然后更新第一个 df 中的键

Scala how to match two dfs if mathes then update the key in first df

我有两个数据帧中的数据:

selectedPersonDF:

ID    key
1     
2     
3
4
5

selectedDetailsDF:

first  second third  key
--------------------------
1       9       9    777
9       8       8    878
8       10      10   765
10      12      19   909
11      2       20   708

代码:

val personDF = spark.read.option("header", "true").option("inferSchema", "false").csv("person.csv")
val detailsDF = spark.read.option("header", "true").option("inferSchema", "false").csv("details.csv")

val selectedPersonDF=personDF.select((col("ID"),col("key"))).show() 
val selectedDetailsDF=detailsDF.select(col("first"),col("second"),col("third"),col("key")).show()

我必须将 selectedPersonDF id 列与 selectedDetailsDF 的所有列(第一、第二、第三)相匹配,如果任何列数据与 persons id 匹配,那么我们必须从 selectedDetailsDF 中获取键值并且必须更新selectedPersonDF 键列。

预期输出(在 selectedPersonDF 中):

ID    key
1     777
2     708     
3
4
5

并且在从 persons'df 中删除第一行之后,因为它与 detailsdf 匹配,剩余数据应该存储在另一个 df 中。

您可以使用 join 并使用 || 条件检查 left join 作为

val finalDF = selectedPersonDF.join(selectedDetailsDF.withColumnRenamed("key", "key2"), $"ID" === $"first" || $"ID" === $"second" || $"ID" === $"third", "left")
  .select($"ID", $"key2".as("key"))
  .show(false)

所以finalDF应该给你

+---+----+
|ID |key |
+---+----+
|1  |777 |
|2  |708 |
|3  |null|
|4  |null|
|5  |null|
+---+----+

我们可以在上面的 dataframe 上调用 .na.fill("")key 列必须是 StringType())以获得

+---+---+
|ID |key|
+---+---+
|1  |777|
|2  |708|
|3  |   |
|4  |   |
|5  |   |
+---+---+

之后你可以使用filterdataframe分成匹配不匹配 using key column with value and null repectively

val notMatchingDF = finalDF.filter($"key" === "")
val matchingDF = finalDF.except(notMatchingDF)

如果 selectedDetailsDF 的列名称除键列外未知,则已更新

如果第二个数据帧列名称未知那么你将不得不形成未知列的 array 列作为

val columnsToCheck = selectedDetailsDF.columns.toSet - "key" toList

import org.apache.spark.sql.functions._
val tempSelectedDetailsDF = selectedDetailsDF.select(array(columnsToCheck.map(col): _*).as("array"), col("key").as("key2"))

现在 tempSelectedDetailsDF 数据框有两列:所有未知列的组合列作为 array 列,键列重命名为 key2

之后,您将需要一个 udf 函数来在加入

时检查条件
val arrayContains = udf((array: collection.mutable.WrappedArray[String], value: String) => array.contains(value))

然后你 join 数据帧使用对定义的 udf 函数的调用作为

val finalDF = selectedPersonDF.join(tempSelectedDetailsDF, arrayContains($"array", $"ID"), "left")
  .select($"ID", $"key2".as("key"))
  .na.fill("")

流程的其余部分已在上面定义。

希望回答对您有所帮助且易于理解。