如何在 Scala 的 RddPair<K,Tuple> 中使用 reduceByKey
How reduceByKey in RddPair<K,Tuple> in Scala
我有一个 CassandraTable。通过 SparkContext.cassandraTable() 访问。检索我所有的 CassandraRow。
现在我要存储3个信息:(用户,城市,字节)
我是这样储存的
rddUsersFilter.map(row =>
(row.getString("user"),(row.getString("city"),row.getString("byte").replace(",","").toLong))).groupByKey
我得到一个 RDD[(String, Iterable[(String, Long)])]
现在,对于每个用户,我想对所有字节求和并为城市创建一个地图,例如:"city"->"occurencies"(这个城市为这个用户应用了多少次)。
之前,我将此代码拆分为两个不同的 RDD,一个用于对字节求和,另一个用于创建映射。
城市出现示例
rddUsers.map(user => (user._1, user._2.size, user._2.groupBy(identity).map(city => (city._1,city._2.size))))
那是因为我可以通过 ._2 方法访问元组的第二个元素。但现在?
我的第二个元素是 Iterable[(String,Long)],我不能像以前那样映射了。
有没有一种解决方案可以只用一个 rdd 和一个 MapReduce 来检索我的所有信息?
您可以通过首先对用户、城市的字节和城市出现进行分组然后按用户进行分组来轻松完成此操作
val data = Array(("user1","city1",100),("user1","city1",100),
("user1","city1",100),("user1","city2",100),("user1","city2",100),
("user1","city3",100),("user1","city2",100),("user2","city1",100),
("user2","city2",100))
val rdd = sc.parallelize(data)
val res = rdd.map(x=> ((x._1,x._2),(1,x._3)))
.reduceByKey((x,y)=> (x._1+y._1,x._2+y._2))
.map(x => (x._1._1,(x._1._2,x._2._1,x._2._2)))
.groupByKey
val userCityUsageRdd = res.map(x => {
val m = x._2.toList
(x._1 ,m.map(y => (y._1->y._2)).toMap, m.map(x => x._3).reduce(_+_))
})
输出
res20: Array[(String, scala.collection.immutable.Map[String,Int], Int)] =
Array((user1,Map(city1 -> 3, city3 -> 1, city2 -> 3),700),
(user2,Map(city1 -> 1, city2 -> 1),200))
我有一个 CassandraTable。通过 SparkContext.cassandraTable() 访问。检索我所有的 CassandraRow。
现在我要存储3个信息:(用户,城市,字节) 我是这样储存的
rddUsersFilter.map(row =>
(row.getString("user"),(row.getString("city"),row.getString("byte").replace(",","").toLong))).groupByKey
我得到一个 RDD[(String, Iterable[(String, Long)])] 现在,对于每个用户,我想对所有字节求和并为城市创建一个地图,例如:"city"->"occurencies"(这个城市为这个用户应用了多少次)。
之前,我将此代码拆分为两个不同的 RDD,一个用于对字节求和,另一个用于创建映射。
城市出现示例
rddUsers.map(user => (user._1, user._2.size, user._2.groupBy(identity).map(city => (city._1,city._2.size))))
那是因为我可以通过 ._2 方法访问元组的第二个元素。但现在? 我的第二个元素是 Iterable[(String,Long)],我不能像以前那样映射了。
有没有一种解决方案可以只用一个 rdd 和一个 MapReduce 来检索我的所有信息?
您可以通过首先对用户、城市的字节和城市出现进行分组然后按用户进行分组来轻松完成此操作
val data = Array(("user1","city1",100),("user1","city1",100),
("user1","city1",100),("user1","city2",100),("user1","city2",100),
("user1","city3",100),("user1","city2",100),("user2","city1",100),
("user2","city2",100))
val rdd = sc.parallelize(data)
val res = rdd.map(x=> ((x._1,x._2),(1,x._3)))
.reduceByKey((x,y)=> (x._1+y._1,x._2+y._2))
.map(x => (x._1._1,(x._1._2,x._2._1,x._2._2)))
.groupByKey
val userCityUsageRdd = res.map(x => {
val m = x._2.toList
(x._1 ,m.map(y => (y._1->y._2)).toMap, m.map(x => x._3).reduce(_+_))
})
输出
res20: Array[(String, scala.collection.immutable.Map[String,Int], Int)] =
Array((user1,Map(city1 -> 3, city3 -> 1, city2 -> 3),700),
(user2,Map(city1 -> 1, city2 -> 1),200))