数据框查找和优化
dataframe look up and optimization
我正在使用 spark-sql-2.4.3v 和 java。
我有以下情况
val data = List(
("20", "score", "school", 14 ,12),
("21", "score", "school", 13 , 13),
("22", "rate", "school", 11 ,14),
("23", "score", "school", 11 ,14),
("24", "rate", "school", 12 ,12),
("25", "score", "school", 11 ,14)
)
val df = data.toDF("id", "code", "entity", "value1","value2")
df.show
//this look up data populated from DB.
val ll = List(
("aaaa", 11),
("aaa", 12),
("aa", 13),
("a", 14)
)
val codeValudeDf = ll.toDF( "code", "value")
codeValudeDf.show
我需要在最终输出中将“代码”映射到“值”,仅适用于那些 rows/records 在“数据”数据框中将“代码”作为“分数”的数据。
我如何从 codeValudeDf 中查找 hashmap,以便我可以获得如下输出
+---+-----+-------+------+-----+
| id| code|entity|value1|value2|
+---+-----+-------+------+-----+
| 20|score|school| a| aaa|
| 21|score|school| aa| aa|
| 22| rate|school| 11| 14|
| 23|score|school| aaaa| a|
| 24| rate|school| 12| 12|
| 25|score|school| aaaa| a|
+---+-----+------+------+------+
有没有可能使这个查找最佳,即每次我不应该从数据库中提取数据帧数据??
如果查找数据很小,那么您可以创建 Map
和 broadcast
它。 broadcasted map
可以很容易地在 udf 中使用,如下所示-
加载提供的测试数据
val data = List(
("20", "score", "school", 14 ,12),
("21", "score", "school", 13 , 13),
("22", "rate", "school", 11 ,14),
("23", "score", "school", 11 ,14),
("24", "rate", "school", 12 ,12),
("25", "score", "school", 11 ,14)
)
val df = data.toDF("id", "code", "entity", "value1","value2")
df.show
/**
* +---+-----+------+------+------+
* | id| code|entity|value1|value2|
* +---+-----+------+------+------+
* | 20|score|school| 14| 12|
* | 21|score|school| 13| 13|
* | 22| rate|school| 11| 14|
* | 23|score|school| 11| 14|
* | 24| rate|school| 12| 12|
* | 25|score|school| 11| 14|
* +---+-----+------+------+------+
*/
//this look up data populated from DB.
val ll = List(
("aaaa", 11),
("aaa", 12),
("aa", 13),
("a", 14)
)
val codeValudeDf = ll.toDF( "code", "value")
codeValudeDf.show
/**
* +----+-----+
* |code|value|
* +----+-----+
* |aaaa| 11|
* | aaa| 12|
* | aa| 13|
* | a| 14|
* +----+-----+
*/
broadcasted map
可以在 udf 中轻松使用,如下所示-
val lookUp = spark.sparkContext
.broadcast(codeValudeDf.map{case Row(code: String, value: Integer) => value -> code}
.collect().toMap)
val look_up = udf((value: Integer) => lookUp.value.get(value))
df.withColumn("value1",
when($"code" === "score", look_up($"value1")).otherwise($"value1".cast("string")))
.withColumn("value2",
when($"code" === "score", look_up($"value2")).otherwise($"value2".cast("string")))
.show(false)
/**
* +---+-----+------+------+------+
* |id |code |entity|value1|value2|
* +---+-----+------+------+------+
* |20 |score|school|a |aaa |
* |21 |score|school|aa |aa |
* |22 |rate |school|11 |14 |
* |23 |score|school|aaaa |a |
* |24 |rate |school|12 |12 |
* |25 |score|school|aaaa |a |
* +---+-----+------+------+------+
*/
使用广播地图确实看起来是一个明智的决定,因为您不需要每次都访问数据库来提取查找数据。
在这里,我已经使用 UDF 中的键值映射解决了这个问题。我无法比较它的性能 w.r.t。广播地图方法,但欢迎 Spark 专家提出意见。
步骤# 1: 构建 KeyValueMap -
val data = List(
("20", "score", "school", 14 ,12),
("21", "score", "school", 13 , 13),
("22", "rate", "school", 11 ,14),
("23", "score", "school", 11 ,14),
("24", "rate", "school", 12 ,12),
("25", "score", "school", 11 ,14)
)
val df = data.toDF("id", "code", "entity", "value1","value2")
val ll = List(
("aaaa", 11),
("aaa", 12),
("aa", 13),
("a", 14)
)
val codeValudeDf = ll.toDF( "code", "value")
val Keys = codeValudeDf.select("value").collect().map(_(0).toString).toList
val Values = codeValudeDf.select("code").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap
步骤 # 2: 创建 UDF
def CodeToValue(code: String, key: String): String = {
if (key == null) return ""
if (code != "score") return key
val result: String = KeyValueMap.getOrElse(key,"not found!")
return result }
val CodeToValueUDF = udf (CodeToValue(_:String, _:String):String )
步骤 # 3: 在原始数据帧中使用 UDF 添加派生列
val newdf = df.withColumn("Col1", CodeToValueUDF(col("code"), col("value1")))
val finaldf = newdf.withColumn("Col2", CodeToValueUDF(col("code"), col("value2")))
finaldf.show(false)
+---+-----+------+------+------+----+----+
| id| code|entity|value1|value2|Col1|Col2|
+---+-----+------+------+------+----+----+
| 20|score|school| 14| 12| a| aaa|
| 21|score|school| 13| 13| aa| aa|
| 22| rate|school| 11| 14| 11| 14|
| 23|score|school| 11| 14|aaaa| a|
| 24| rate|school| 12| 12| 12| 12|
| 25|score|school| 11| 14|aaaa| a|
+---+-----+------+------+------+----+----+
我正在使用 spark-sql-2.4.3v 和 java。 我有以下情况
val data = List(
("20", "score", "school", 14 ,12),
("21", "score", "school", 13 , 13),
("22", "rate", "school", 11 ,14),
("23", "score", "school", 11 ,14),
("24", "rate", "school", 12 ,12),
("25", "score", "school", 11 ,14)
)
val df = data.toDF("id", "code", "entity", "value1","value2")
df.show
//this look up data populated from DB.
val ll = List(
("aaaa", 11),
("aaa", 12),
("aa", 13),
("a", 14)
)
val codeValudeDf = ll.toDF( "code", "value")
codeValudeDf.show
我需要在最终输出中将“代码”映射到“值”,仅适用于那些 rows/records 在“数据”数据框中将“代码”作为“分数”的数据。
我如何从 codeValudeDf 中查找 hashmap,以便我可以获得如下输出
+---+-----+-------+------+-----+
| id| code|entity|value1|value2|
+---+-----+-------+------+-----+
| 20|score|school| a| aaa|
| 21|score|school| aa| aa|
| 22| rate|school| 11| 14|
| 23|score|school| aaaa| a|
| 24| rate|school| 12| 12|
| 25|score|school| aaaa| a|
+---+-----+------+------+------+
有没有可能使这个查找最佳,即每次我不应该从数据库中提取数据帧数据??
如果查找数据很小,那么您可以创建 Map
和 broadcast
它。 broadcasted map
可以很容易地在 udf 中使用,如下所示-
加载提供的测试数据
val data = List(
("20", "score", "school", 14 ,12),
("21", "score", "school", 13 , 13),
("22", "rate", "school", 11 ,14),
("23", "score", "school", 11 ,14),
("24", "rate", "school", 12 ,12),
("25", "score", "school", 11 ,14)
)
val df = data.toDF("id", "code", "entity", "value1","value2")
df.show
/**
* +---+-----+------+------+------+
* | id| code|entity|value1|value2|
* +---+-----+------+------+------+
* | 20|score|school| 14| 12|
* | 21|score|school| 13| 13|
* | 22| rate|school| 11| 14|
* | 23|score|school| 11| 14|
* | 24| rate|school| 12| 12|
* | 25|score|school| 11| 14|
* +---+-----+------+------+------+
*/
//this look up data populated from DB.
val ll = List(
("aaaa", 11),
("aaa", 12),
("aa", 13),
("a", 14)
)
val codeValudeDf = ll.toDF( "code", "value")
codeValudeDf.show
/**
* +----+-----+
* |code|value|
* +----+-----+
* |aaaa| 11|
* | aaa| 12|
* | aa| 13|
* | a| 14|
* +----+-----+
*/
broadcasted map
可以在 udf 中轻松使用,如下所示-
val lookUp = spark.sparkContext
.broadcast(codeValudeDf.map{case Row(code: String, value: Integer) => value -> code}
.collect().toMap)
val look_up = udf((value: Integer) => lookUp.value.get(value))
df.withColumn("value1",
when($"code" === "score", look_up($"value1")).otherwise($"value1".cast("string")))
.withColumn("value2",
when($"code" === "score", look_up($"value2")).otherwise($"value2".cast("string")))
.show(false)
/**
* +---+-----+------+------+------+
* |id |code |entity|value1|value2|
* +---+-----+------+------+------+
* |20 |score|school|a |aaa |
* |21 |score|school|aa |aa |
* |22 |rate |school|11 |14 |
* |23 |score|school|aaaa |a |
* |24 |rate |school|12 |12 |
* |25 |score|school|aaaa |a |
* +---+-----+------+------+------+
*/
使用广播地图确实看起来是一个明智的决定,因为您不需要每次都访问数据库来提取查找数据。
在这里,我已经使用 UDF 中的键值映射解决了这个问题。我无法比较它的性能 w.r.t。广播地图方法,但欢迎 Spark 专家提出意见。
步骤# 1: 构建 KeyValueMap -
val data = List(
("20", "score", "school", 14 ,12),
("21", "score", "school", 13 , 13),
("22", "rate", "school", 11 ,14),
("23", "score", "school", 11 ,14),
("24", "rate", "school", 12 ,12),
("25", "score", "school", 11 ,14)
)
val df = data.toDF("id", "code", "entity", "value1","value2")
val ll = List(
("aaaa", 11),
("aaa", 12),
("aa", 13),
("a", 14)
)
val codeValudeDf = ll.toDF( "code", "value")
val Keys = codeValudeDf.select("value").collect().map(_(0).toString).toList
val Values = codeValudeDf.select("code").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap
步骤 # 2: 创建 UDF
def CodeToValue(code: String, key: String): String = {
if (key == null) return ""
if (code != "score") return key
val result: String = KeyValueMap.getOrElse(key,"not found!")
return result }
val CodeToValueUDF = udf (CodeToValue(_:String, _:String):String )
步骤 # 3: 在原始数据帧中使用 UDF 添加派生列
val newdf = df.withColumn("Col1", CodeToValueUDF(col("code"), col("value1")))
val finaldf = newdf.withColumn("Col2", CodeToValueUDF(col("code"), col("value2")))
finaldf.show(false)
+---+-----+------+------+------+----+----+
| id| code|entity|value1|value2|Col1|Col2|
+---+-----+------+------+------+----+----+
| 20|score|school| 14| 12| a| aaa|
| 21|score|school| 13| 13| aa| aa|
| 22| rate|school| 11| 14| 11| 14|
| 23|score|school| 11| 14|aaaa| a|
| 24| rate|school| 12| 12| 12| 12|
| 25|score|school| 11| 14|aaaa| a|
+---+-----+------+------+------+----+----+