spark scala按键合并多个rdd
spark scala merge multi rdd by key
我有多个rdds
genderRDD
(1713926427,{gender={f=3327, m=1945, unknown=897}})
和
actionRDD
(1713926427,{actionType={repost=2927, comment=2345, like=897}})
和
deviceRDD
(1713926427,{deviceType={iphone=2999, android=12321}})
它们的格式是 RDD(id, HashMap[String, HashMap[String, Integer]])
我想通过 id 合并两个 rdds,它应该生成:
(1713926427,{gender={f=3327, m=1945, unknown=897},actionType={repost=2927, comment=2345, like=897},device={iphone=2999, android=12321}})
这样我就可以将它存储到数据库中。通常如何做到这一点?
我假设 {deviceType={iphone=2999, android=12321}}
代表一个 Map[String, Map[String, Int]]
,在那种情况下 - 你应该简单地使用 join
s 然后一些简单的映射到 "flatten" 结果:
// some sample data:
val rdd1 = sc.parallelize(Seq((1713926427, Map("gender" -> Map("f" -> 3327, "m" ->1945, "unknown" -> 897)))))
val rdd2 = sc.parallelize(Seq((1713926427, Map("actionType" -> Map("repost" -> 2927, "comment" -> 2345, "like" -> 897)))))
val rdd3 = sc.parallelize(Seq((1713926427, Map("deviceType" -> Map("iphone" -> 2999, "android" -> 12321)))))
// join all three RDDs and map to flatten the value:
val result = rdd1
.join(rdd2)
.join(rdd3)
.map { case (id, ((gender, action), device)) => (id, (gender ++ action ++ device)) }
// result has type RDD[(Int, Map[String, Map[String, Int]])]
result.foreach(println)
// prints:
// (1713926427,Map(gender -> Map(f -> 3327, m -> 1945, unknown -> 897), actionType -> Map(repost -> 2927, comment -> 2345, like -> 897), deviceType -> Map(iphone -> 2999, android -> 12321)))
我有多个rdds
genderRDD
(1713926427,{gender={f=3327, m=1945, unknown=897}})
和
actionRDD
(1713926427,{actionType={repost=2927, comment=2345, like=897}})
和
deviceRDD
(1713926427,{deviceType={iphone=2999, android=12321}})
它们的格式是 RDD(id, HashMap[String, HashMap[String, Integer]])
我想通过 id 合并两个 rdds,它应该生成:
(1713926427,{gender={f=3327, m=1945, unknown=897},actionType={repost=2927, comment=2345, like=897},device={iphone=2999, android=12321}})
这样我就可以将它存储到数据库中。通常如何做到这一点?
我假设 {deviceType={iphone=2999, android=12321}}
代表一个 Map[String, Map[String, Int]]
,在那种情况下 - 你应该简单地使用 join
s 然后一些简单的映射到 "flatten" 结果:
// some sample data:
val rdd1 = sc.parallelize(Seq((1713926427, Map("gender" -> Map("f" -> 3327, "m" ->1945, "unknown" -> 897)))))
val rdd2 = sc.parallelize(Seq((1713926427, Map("actionType" -> Map("repost" -> 2927, "comment" -> 2345, "like" -> 897)))))
val rdd3 = sc.parallelize(Seq((1713926427, Map("deviceType" -> Map("iphone" -> 2999, "android" -> 12321)))))
// join all three RDDs and map to flatten the value:
val result = rdd1
.join(rdd2)
.join(rdd3)
.map { case (id, ((gender, action), device)) => (id, (gender ++ action ++ device)) }
// result has type RDD[(Int, Map[String, Map[String, Int]])]
result.foreach(println)
// prints:
// (1713926427,Map(gender -> Map(f -> 3327, m -> 1945, unknown -> 897), actionType -> Map(repost -> 2927, comment -> 2345, like -> 897), deviceType -> Map(iphone -> 2999, android -> 12321)))