spark scala按键合并多个rdd

spark scala merge multi rdd by key

我有多个rdds

genderRDD

(1713926427,{gender={f=3327, m=1945, unknown=897}})

actionRDD

(1713926427,{actionType={repost=2927, comment=2345, like=897}})

deviceRDD

(1713926427,{deviceType={iphone=2999, android=12321}})

它们的格式是 RDD(id, HashMap[String, HashMap[String, Integer]])

我想通过 id 合并两个 rdds,它应该生成:

(1713926427,{gender={f=3327, m=1945, unknown=897},actionType={repost=2927, comment=2345, like=897},device={iphone=2999, android=12321}})

这样我就可以将它存储到数据库中。通常如何做到这一点?

我假设 {deviceType={iphone=2999, android=12321}} 代表一个 Map[String, Map[String, Int]],在那种情况下 - 你应该简单地使用 joins 然后一些简单的映射到 "flatten" 结果:

// some sample data:
val rdd1 = sc.parallelize(Seq((1713926427, Map("gender" -> Map("f" -> 3327, "m" ->1945, "unknown" -> 897)))))
val rdd2 = sc.parallelize(Seq((1713926427, Map("actionType" -> Map("repost" -> 2927, "comment" -> 2345, "like" -> 897)))))
val rdd3 = sc.parallelize(Seq((1713926427, Map("deviceType" -> Map("iphone" -> 2999, "android" -> 12321)))))

// join all three RDDs and map to flatten the value:
val result = rdd1
  .join(rdd2)
  .join(rdd3)
  .map { case (id, ((gender, action), device)) => (id, (gender ++ action ++ device)) }

// result has type RDD[(Int, Map[String, Map[String, Int]])]
result.foreach(println)
// prints:
// (1713926427,Map(gender -> Map(f -> 3327, m -> 1945, unknown -> 897), actionType -> Map(repost -> 2927, comment -> 2345, like -> 897), deviceType -> Map(iphone -> 2999, android -> 12321)))