Scala 如何匹配两个 dfs 如果数学然后更新第一个 df 中的键
Scala how to match two dfs if mathes then update the key in first df
我有两个数据帧中的数据:
selectedPersonDF:
ID key
1
2
3
4
5
selectedDetailsDF:
first second third key
--------------------------
1 9 9 777
9 8 8 878
8 10 10 765
10 12 19 909
11 2 20 708
代码:
val personDF = spark.read.option("header", "true").option("inferSchema", "false").csv("person.csv")
val detailsDF = spark.read.option("header", "true").option("inferSchema", "false").csv("details.csv")
val selectedPersonDF=personDF.select((col("ID"),col("key"))).show()
val selectedDetailsDF=detailsDF.select(col("first"),col("second"),col("third"),col("key")).show()
我必须将 selectedPersonDF id 列与 selectedDetailsDF 的所有列(第一、第二、第三)相匹配,如果任何列数据与 persons id 匹配,那么我们必须从 selectedDetailsDF 中获取键值并且必须更新selectedPersonDF 键列。
预期输出(在 selectedPersonDF 中):
ID key
1 777
2 708
3
4
5
并且在从 persons'df 中删除第一行之后,因为它与 detailsdf 匹配,剩余数据应该存储在另一个 df 中。
您可以使用 join
并使用 ||
条件检查 和 left join
作为
val finalDF = selectedPersonDF.join(selectedDetailsDF.withColumnRenamed("key", "key2"), $"ID" === $"first" || $"ID" === $"second" || $"ID" === $"third", "left")
.select($"ID", $"key2".as("key"))
.show(false)
所以finalDF
应该给你
+---+----+
|ID |key |
+---+----+
|1 |777 |
|2 |708 |
|3 |null|
|4 |null|
|5 |null|
+---+----+
我们可以在上面的 dataframe 上调用 .na.fill("")
(key
列必须是 StringType()
)以获得
+---+---+
|ID |key|
+---+---+
|1 |777|
|2 |708|
|3 | |
|4 | |
|5 | |
+---+---+
之后你可以使用filter
将dataframe分成匹配和不匹配 using key
column with value and null repectively
val notMatchingDF = finalDF.filter($"key" === "")
val matchingDF = finalDF.except(notMatchingDF)
如果 selectedDetailsDF 的列名称除键列外未知,则已更新
如果第二个数据帧的列名称是未知那么你将不得不形成未知列的 array
列作为
val columnsToCheck = selectedDetailsDF.columns.toSet - "key" toList
import org.apache.spark.sql.functions._
val tempSelectedDetailsDF = selectedDetailsDF.select(array(columnsToCheck.map(col): _*).as("array"), col("key").as("key2"))
现在 tempSelectedDetailsDF
数据框有两列:所有未知列的组合列作为 array
列,键列重命名为 key2
。
之后,您将需要一个 udf
函数来在加入
时检查条件
val arrayContains = udf((array: collection.mutable.WrappedArray[String], value: String) => array.contains(value))
然后你 join
数据帧使用对定义的 udf
函数的调用作为
val finalDF = selectedPersonDF.join(tempSelectedDetailsDF, arrayContains($"array", $"ID"), "left")
.select($"ID", $"key2".as("key"))
.na.fill("")
流程的其余部分已在上面定义。
希望回答对您有所帮助且易于理解。
我有两个数据帧中的数据:
selectedPersonDF:
ID key
1
2
3
4
5
selectedDetailsDF:
first second third key
--------------------------
1 9 9 777
9 8 8 878
8 10 10 765
10 12 19 909
11 2 20 708
代码:
val personDF = spark.read.option("header", "true").option("inferSchema", "false").csv("person.csv")
val detailsDF = spark.read.option("header", "true").option("inferSchema", "false").csv("details.csv")
val selectedPersonDF=personDF.select((col("ID"),col("key"))).show()
val selectedDetailsDF=detailsDF.select(col("first"),col("second"),col("third"),col("key")).show()
我必须将 selectedPersonDF id 列与 selectedDetailsDF 的所有列(第一、第二、第三)相匹配,如果任何列数据与 persons id 匹配,那么我们必须从 selectedDetailsDF 中获取键值并且必须更新selectedPersonDF 键列。
预期输出(在 selectedPersonDF 中):
ID key
1 777
2 708
3
4
5
并且在从 persons'df 中删除第一行之后,因为它与 detailsdf 匹配,剩余数据应该存储在另一个 df 中。
您可以使用 join
并使用 ||
条件检查 和 left join
作为
val finalDF = selectedPersonDF.join(selectedDetailsDF.withColumnRenamed("key", "key2"), $"ID" === $"first" || $"ID" === $"second" || $"ID" === $"third", "left")
.select($"ID", $"key2".as("key"))
.show(false)
所以finalDF
应该给你
+---+----+
|ID |key |
+---+----+
|1 |777 |
|2 |708 |
|3 |null|
|4 |null|
|5 |null|
+---+----+
我们可以在上面的 dataframe 上调用 .na.fill("")
(key
列必须是 StringType()
)以获得
+---+---+
|ID |key|
+---+---+
|1 |777|
|2 |708|
|3 | |
|4 | |
|5 | |
+---+---+
之后你可以使用filter
将dataframe分成匹配和不匹配 using key
column with value and null repectively
val notMatchingDF = finalDF.filter($"key" === "")
val matchingDF = finalDF.except(notMatchingDF)
如果 selectedDetailsDF 的列名称除键列外未知,则已更新
如果第二个数据帧的列名称是未知那么你将不得不形成未知列的 array
列作为
val columnsToCheck = selectedDetailsDF.columns.toSet - "key" toList
import org.apache.spark.sql.functions._
val tempSelectedDetailsDF = selectedDetailsDF.select(array(columnsToCheck.map(col): _*).as("array"), col("key").as("key2"))
现在 tempSelectedDetailsDF
数据框有两列:所有未知列的组合列作为 array
列,键列重命名为 key2
。
之后,您将需要一个 udf
函数来在加入
val arrayContains = udf((array: collection.mutable.WrappedArray[String], value: String) => array.contains(value))
然后你 join
数据帧使用对定义的 udf
函数的调用作为
val finalDF = selectedPersonDF.join(tempSelectedDetailsDF, arrayContains($"array", $"ID"), "left")
.select($"ID", $"key2".as("key"))
.na.fill("")
流程的其余部分已在上面定义。
希望回答对您有所帮助且易于理解。