Spark scala:将键值对中的数值更改为long/integer以连接两个地图

Spark scala: change numeric values in key-value pair to long/integer to join two maps

嗨,我是 spark 使用 scala 的新手。我有两个不同的文件,我已经根据需要创建了两个地图,如下所示:

data 1
1 : 2
2 : 1,3,4
3 : 2,4
4 : 2, 3

map1 将计算“:”之后出现的每个键 map1 的输出是:

(1, 1)
(2, 3)
(3, 2)
(4, 2)

数据 2:

apple
banana
kiwi
orange
strawberry

map2 会告诉元素位置,它的输出是:

(1, apple)
(2, banana)
(3, kiwi)
(4, orange)
(5, strawberry)

我需要的是将两个地图连接起来,输出如下:

(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)

我只能使用org.apache.spark.SparkConf和org.apache.spark.SparkCotext。以下是我目前使用的代码:

    val sc = new SparkContext (conf)
    val data1 = sc.textFile("input.txt")
    val map1 = data1.map(x => x.split(":")(0), x.split(":")(1))).flatMap{case (y,z) => z.split("\s+").map((y,_)}
.filter(_._2.nonEmpty).sortByKey().countByKey()
    val data2 = sc.textFile("input2.txt")
    val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
.map(pair => pair.swap)

    val merge_map = map2.join(map1)

我想加入我制作的两张地图,但它抛出了这样的错误:

type mismatch; 
found: scala.collection.Map[String, Long] 
required: org.apache.spark.rdd.RDD[(Long,?)]

我在想也许我需要更改 map1/map2 中值的类型。任何想法如何做到这一点?谢谢!

跟进问题:

现在我需要用相同的数据创建 map3,它计算“:”后右侧每个值的出现次数。并再次加入地图 2。这是 map3 的输出以及我需要的 map3 和 map2 的加入结果。

输出地图3:

(1,1)
(2,3)
(3,2)
(4,2)

加入地图2&地图3:

(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)

这是我使用的代码:

val map3 = data1.map(x => x.split(":")(0).toLong, x.split(":")(1))).flatMap{case (y,z) => z.split("\s+").map((_,1)}.reduceByKey(_+_)

val merge_map23 = map2.leftOuterJoin(map3)

我收到一个错误:

type mismatch; 
    found: org.apache.spark.rdd.RDD[String, Long] 
    required: org.apache.spark.rdd.RDD[(Long,?)]

我已经用下面的答案修复了以前的代码,但现在我得到了这个错误。谢谢

不要使用 countByKey。使用 reduceByKey:

val map1 = data1.map(x => x.split(":")(0), x.split(":")(1)))
  .flatMap{case (y,z) => z.split("\s+").map((y,_)}    
  .filter(_._2.nonEmpty).mapValues(_ => 1).reduceByKey(_ + _)

那就不要用collectAsMap:

val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
  .map(pair => pair.swap)

终于加入

map1.join(map2)