如何在给定多个条件的情况下对 Spark 数据帧执行 "Lookup" 操作
How to perform "Lookup" operation on Spark dataframes given multiple conditions
我是 Spark 的新手(我的版本是 1.6.0),现在我正在尝试解决下面给出的问题:
假设有两个源文件:
- 第一个(简称A)比较大,包含A1、B1、C1等80列。里面有230K条记录。
- 第二个(简称 B)是一个小型查找 table,其中包含名为 A2、B2、C2 和 D2 的列。里面有250条记录。
现在我们需要在 A 中插入一个新列,逻辑如下:
- 首先在B中查找A1、B1、C1(对应的列为A2、B2、C2),如果成功,则returnD2作为新增列的值。如果没有找到...
- 然后在B中查找A1,B1,如果成功,returnD2。如果没有找到...
- 设置默认值“NA”
我已经读入文件并将它们转换成数据框。对于第一种情况,我通过将它们连接在一起得到了结果。但是我找不到下一步的好方法。
我目前的尝试是通过使用不太严格的条件连接 A 和 B 来构建新的数据框。但是我不知道如何从另一个更新当前数据框。或者有没有其他更直观有效的方法来解决整个问题?
谢谢大家的回答。
----------------------------更新于20160309------------ ------------------
终于接受了@mlk 的回答。仍然非常感谢 @zero323 his/her 关于 UDF 与 join 的精彩评论,Tungsten 代码生成确实是我们现在面临的另一个问题。但是由于我们需要做几十次查找,每次查找平均4个条件,所以前一个方案比较suitable...
最终的解决方案看起来像下面的片段:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
由于 B 很小,我认为最好的方法是广播变量和用户定义的函数。
// However you get the data...
case class BType( A2: Int, B2: Int, C2 : Int, D2 : String)
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200"))
val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER")
// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)
// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example.
val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 }
// Use the UDF in a select
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show
仅供参考没有UDF的解决方案:
val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1"))
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2"))
// Match A, B and C
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1")
// Match A and B mismatch C
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2")
val toDrop = b1.columns ++ b2.columns
toDrop.foldLeft(a
.join(b1, expr1, "leftouter")
.join(b2, expr2, "leftouter")
// If there is match on A, B, C then D_1 should be not NULL
// otherwise we fall-back to D_2
.withColumn("D", coalesce($"D_1", $"D_2"))
)((df, c) => df.drop(c))
这假定每个类别(所有三列或前两列)中最多有一个匹配项,或者需要输出中的重复行。
UDF 与 JOIN:
有多种因素需要考虑,这里没有简单的答案:
缺点:
- 广播
joins
需要向工作节点传递两次数据。至于现在 broadcasted
个表未缓存 (SPARK-3863),并且在不久的将来不太可能更改(解决方案:稍后)。
join
即使完全匹配,操作也会应用两次。
优点:
join
和 coalesce
对优化器是透明的,而 UDF 不是。
- 直接使用 SQL 表达式可以受益于所有 Tungsten 优化,包括代码生成,而 UDF 则不能。
我是 Spark 的新手(我的版本是 1.6.0),现在我正在尝试解决下面给出的问题:
假设有两个源文件:
- 第一个(简称A)比较大,包含A1、B1、C1等80列。里面有230K条记录。
- 第二个(简称 B)是一个小型查找 table,其中包含名为 A2、B2、C2 和 D2 的列。里面有250条记录。
现在我们需要在 A 中插入一个新列,逻辑如下:
- 首先在B中查找A1、B1、C1(对应的列为A2、B2、C2),如果成功,则returnD2作为新增列的值。如果没有找到...
- 然后在B中查找A1,B1,如果成功,returnD2。如果没有找到...
- 设置默认值“NA”
我已经读入文件并将它们转换成数据框。对于第一种情况,我通过将它们连接在一起得到了结果。但是我找不到下一步的好方法。
我目前的尝试是通过使用不太严格的条件连接 A 和 B 来构建新的数据框。但是我不知道如何从另一个更新当前数据框。或者有没有其他更直观有效的方法来解决整个问题?
谢谢大家的回答。
----------------------------更新于20160309------------ ------------------
终于接受了@mlk 的回答。仍然非常感谢 @zero323 his/her 关于 UDF 与 join 的精彩评论,Tungsten 代码生成确实是我们现在面临的另一个问题。但是由于我们需要做几十次查找,每次查找平均4个条件,所以前一个方案比较suitable...
最终的解决方案看起来像下面的片段:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
由于 B 很小,我认为最好的方法是广播变量和用户定义的函数。
// However you get the data...
case class BType( A2: Int, B2: Int, C2 : Int, D2 : String)
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200"))
val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER")
// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)
// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example.
val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 }
// Use the UDF in a select
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show
仅供参考没有UDF的解决方案:
val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1"))
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2"))
// Match A, B and C
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1")
// Match A and B mismatch C
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2")
val toDrop = b1.columns ++ b2.columns
toDrop.foldLeft(a
.join(b1, expr1, "leftouter")
.join(b2, expr2, "leftouter")
// If there is match on A, B, C then D_1 should be not NULL
// otherwise we fall-back to D_2
.withColumn("D", coalesce($"D_1", $"D_2"))
)((df, c) => df.drop(c))
这假定每个类别(所有三列或前两列)中最多有一个匹配项,或者需要输出中的重复行。
UDF 与 JOIN:
有多种因素需要考虑,这里没有简单的答案:
缺点:
- 广播
joins
需要向工作节点传递两次数据。至于现在broadcasted
个表未缓存 (SPARK-3863),并且在不久的将来不太可能更改(解决方案:稍后)。 join
即使完全匹配,操作也会应用两次。
优点:
join
和coalesce
对优化器是透明的,而 UDF 不是。- 直接使用 SQL 表达式可以受益于所有 Tungsten 优化,包括代码生成,而 UDF 则不能。