如何制作这个..spark scala collection map

how to make this ..spark scala collection map

我的数据=

aaaa|1000
bbb|1000
ccc|1000
aaaa|1000
aaaa|2000
aaaa|3000
aaaa|2000
aaaa|1000
bbb|2000
bbb|2000
ccc|1000
ccc|1000
ccc|2000
ccc|3000
ccc|4000

我想计算每个文本标签的每个数值出现的次数:

aaaa||1000||3||2000||2||3000||1
bbb||2000||2||1000||1
ccc||1000||3||4000||1||2000||1||3000||1

这是我的代码

val UserShopRowData = inputData.map( s => (s.replace("|", " ").split(" "))).map( s => (s(0), s(1)))
val u1 = UserShopRowData.map(s=> (s, 1)).reduceByKey(_+_)
val u2 = u1.map(s => (s._1._1, s._1._2, s._2 ))
val u3 = u2.toLocalIterator.toList.sortBy(s => (s._1, s._3 )).reverse

这是我得到的结果:

(ccc,1000,3)
(ccc,4000,1)
(ccc,2000,1)
(ccc,3000,1)
(bbb,2000,2)
(bbb,1000,1)
(aaaa,1000,3)
(aaaa,2000,2)
(aaaa,3000,1)

请给我解决方案或建议。

input
.map(r=>r.split("\|"))          // do basic word count on input data first
.map(r=> ((r(0), r(1)),1))                        
.reduceByKey(_ + _)
.map(r=>(r._1._1,(r._1._2 + "||" + r._2))) // split key and aggregate again
.reduceByKey((a,b)=> a+"||" + b)
.map(r=>r._1 + "||" + r._2)

看起来你快完成了 - 你只需要另一个 groupBy 和一些映射来获得所需的结构。总而言之,这可以按如下方式完成:

// counting occurrences and reformatting into Tuple3's:
val countByTuple: RDD[(String, String, Int)] = inputData.map(_.split('|').toList)
  .map(s => (s, 1))
  .reduceByKey(_ + _)
  .map { case (List(label, number), count) => (label, number, count) }

// grouping by text label only, and reformatting into desired structure 
val result: RDD[(String, Iterable[(String, Int)])] = countByTuple.groupBy(_._1)
  .map { case (key, iter) => (key, iter.map(t => (t._2, t._3))) }

result.foreach(println)
// prints:
// (aaaa,List((1000,3), (2000,2), (3000,1)))
// (bbb,List((2000,2), (1000,1)))
// (ccc,List((1000,3), (4000,1), (3000,1), (2000,1)))