当我遍历 RDD 中的广播变量时,如何获得正确的 RDD 映射输出?
How can I get the right RDD map output, when I iterate over a broadcast variable inside the RDD?
我的代码如下所示:
val conf = new SparkConf().setAppName("Sample").setMaster("local")
val sc = new SparkContext(conf)
val rdd1: RDD[(Int, Int)] = sc.parallelize(Seq((1,1),(2,3),(3,4))
val rdd2= RDD[(Int, Int)] = sc.parallelize(Seq((2,1),(3,2),(7,6))
val rdd2AsMap = rdd2.collectAsMap.toMap
val broadcastMap = sc.broadcast(rdd2AsMap)
val result = rdd1.map{case(x,y) => {
for((key,value) <- broadcastMap .value) {
(x,key)
}
}}
result.saveAsTextFile("file:///home/cjohnson/output")
写入文件的预期输出应为:
(1,2)
(1,3)
(1,7)
(2,2)
(2,3)
(2,7)
(3,2)
(3,3)
(3,7)
但我将此输出写入文件:
()
()
()
我该如何解决这个问题?
P.S。这只是我提供的一些小样本数据,用于证明我的问题。实际数据要大得多。
- 那里面的
for
returns单位即()
因为你忘了加上yield
:
val a = for((key, _) <- Map(1 -> "")) yield { (key) }
- 您需要
flatMap
而不是 map
才能在每个 rdd 密钥和广播映射密钥之间生成该产品。
关于您的问题,我将如何处理:
rdd1
.keys
.flatMap { rddKey =>
broadcastMap
.value
.keys
.map(broadcastKey => (rddKey, broadcastKey))
}
稍后编辑:
可以写成笛卡尔坐标
rdd1
.keys
.cartesian(rdd2.keys)
我的代码如下所示:
val conf = new SparkConf().setAppName("Sample").setMaster("local")
val sc = new SparkContext(conf)
val rdd1: RDD[(Int, Int)] = sc.parallelize(Seq((1,1),(2,3),(3,4))
val rdd2= RDD[(Int, Int)] = sc.parallelize(Seq((2,1),(3,2),(7,6))
val rdd2AsMap = rdd2.collectAsMap.toMap
val broadcastMap = sc.broadcast(rdd2AsMap)
val result = rdd1.map{case(x,y) => {
for((key,value) <- broadcastMap .value) {
(x,key)
}
}}
result.saveAsTextFile("file:///home/cjohnson/output")
写入文件的预期输出应为:
(1,2)
(1,3)
(1,7)
(2,2)
(2,3)
(2,7)
(3,2)
(3,3)
(3,7)
但我将此输出写入文件:
()
()
()
我该如何解决这个问题?
P.S。这只是我提供的一些小样本数据,用于证明我的问题。实际数据要大得多。
- 那里面的
for
returns单位即()
因为你忘了加上yield
:
val a = for((key, _) <- Map(1 -> "")) yield { (key) }
- 您需要
flatMap
而不是map
才能在每个 rdd 密钥和广播映射密钥之间生成该产品。
关于您的问题,我将如何处理:
rdd1
.keys
.flatMap { rddKey =>
broadcastMap
.value
.keys
.map(broadcastKey => (rddKey, broadcastKey))
}
稍后编辑:
可以写成笛卡尔坐标
rdd1
.keys
.cartesian(rdd2.keys)