RDD 上的 Spark 左外连接和重复键
Spark left outer join and duplicate keys on RDDs
我有两个(键,值)RDD。我的第二个 RDD 比我的第一个 RDD 短。我想将我的第一个 RDD 的每个值与第二个 RDD 中的相应值相关联,相对于键。
val (rdd1: RDD[(key,A)])
val (rdd2: RDD[(key,B)])
val (rdd3: RDD[R])
与rdd1.count() >> rdd2.count(),并且rdd1的多个元素有相同的key。
现在,我知道当在 rdd2 中找不到相应的键时,我想对 b 使用 常量 值。我认为 leftOuterJoin 是在这里使用的自然方法:
val rdd3 = rdd1.leftOuterJoin(rdd2).map{
case (key,(a,None)) => R(a,c)
case (key,(a,Some(b)) => R(a,b)
}
这里有什么地方会让您觉得不对吗?加入这样的元素时,我得到了意想不到的结果。
不完全确定你的问题是什么,但这里是:
方法 1
val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example {
val c = -999
def myFunc = {
val enclosedC = c
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map ( x => x match {
case (x._1, (x._2._1, None)) => (x._1, (Some(x._2._1), Some(enclosedC)))
case _ => (x._1, (Some(x._2._1), x._2._2 ))
}).sortByKey()
//rdd4.foreach(println)
}
}
Example.myFunc
方法二
val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example {
val c = -999
def myFunc = {
val enclosedC = c
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1, (Some(x._2._1), Some(enclosedC)) )) else ( (x._1, (Some(x._2._1), x._2._2)) ) }).sortByKey()
//rdd4.foreach(println)
}
}
Example.myFunc
方法 3
val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example extends Serializable {
val c = -999
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1, (Some(x._2._1), Some(c)) )) else ( (x._1, (Some(x._2._1), x._2._2)) ) }).sortByKey()
//rdd4.collect
//rdd4.foreach(println)
}
Example
我有两个(键,值)RDD。我的第二个 RDD 比我的第一个 RDD 短。我想将我的第一个 RDD 的每个值与第二个 RDD 中的相应值相关联,相对于键。
val (rdd1: RDD[(key,A)])
val (rdd2: RDD[(key,B)])
val (rdd3: RDD[R])
与rdd1.count() >> rdd2.count(),并且rdd1的多个元素有相同的key。
现在,我知道当在 rdd2 中找不到相应的键时,我想对 b 使用 常量 值。我认为 leftOuterJoin 是在这里使用的自然方法:
val rdd3 = rdd1.leftOuterJoin(rdd2).map{
case (key,(a,None)) => R(a,c)
case (key,(a,Some(b)) => R(a,b)
}
这里有什么地方会让您觉得不对吗?加入这样的元素时,我得到了意想不到的结果。
不完全确定你的问题是什么,但这里是:
方法 1
val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example {
val c = -999
def myFunc = {
val enclosedC = c
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map ( x => x match {
case (x._1, (x._2._1, None)) => (x._1, (Some(x._2._1), Some(enclosedC)))
case _ => (x._1, (Some(x._2._1), x._2._2 ))
}).sortByKey()
//rdd4.foreach(println)
}
}
Example.myFunc
方法二
val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example {
val c = -999
def myFunc = {
val enclosedC = c
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1, (Some(x._2._1), Some(enclosedC)) )) else ( (x._1, (Some(x._2._1), x._2._2)) ) }).sortByKey()
//rdd4.foreach(println)
}
}
Example.myFunc
方法 3
val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example extends Serializable {
val c = -999
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1, (Some(x._2._1), Some(c)) )) else ( (x._1, (Some(x._2._1), x._2._2)) ) }).sortByKey()
//rdd4.collect
//rdd4.foreach(println)
}
Example