如何将示例字数更改为 return 标签?
How to change the sample word count to return the labels?
以下代码执行集合类型的字数统计:org.apache.spark.rdd.RDD[(String, List[(String, Int)])]
val words : org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = sc.parallelize( List(("a" , List( ("test" , 1) , ("test" , 1)))) )
val missingLabels : RDD[(String, Int)] = words.flatMap(m => m._2).reduceByKey((a, b) => a + b)
println("Labels Missing")
missingLabels.collect().foreach(println)
我怎样才能抓取标签,以便提取值 ("a" , ("test" , 2))
而不是 ("test" , 2)
?换句话说,输入 RDD[ (String , List( (String, Int) ))]
.
如果我没理解错的话,你应该稍微玩一下元组。
import org.apache.spark.rdd.RDD
val words : RDD[(String, List[(String, Int)])] = sc.parallelize( List(("a" , List( ("test" , 1) , ("test" , 1)))) )
val wordsWithLabels = words
.flatMap {
case (label, listOfValues) => listOfValues.map {
case (word,count) => (word, (label, count))
}
}
val result = wordsWithLabels
.reduceByKey {
case ((label1, count1), (label2, count2)) =>
(label1, count1 + count2)
}
.map {
case (word, (label, count)) =>
(label, (word, count))
}
result.foreach(println)
如果密钥可以重复,那么我假设您想将其全部减少到一对?如果是:
def reduceList(list: List[(String, Int)]) = list.groupBy(_._1).mapValues(_.aggregate(0)(_ + _._2, _ + _))
val words : org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = sc.parallelize( List(("a" , List( ("test" , 1) , ("test" , 1)))) )
val mergedList = words.mapValues((list : List[(String, Int)]) => reduceList(list).toList)
val missingLabels = mergedList.reduceByKey((accum: List[(String, Int)], value: List[(String, Int)]) =>
{
val valueMap = value.toMap
val accumMap = accum.toMap
val mergedMap = accumMap ++ valueMap.map{case(k,v) => k -> (v + accumMap.getOrElse(k, 0))}
mergedMap.toList
})
missingLabels.foreach(println)
以下代码执行集合类型的字数统计:org.apache.spark.rdd.RDD[(String, List[(String, Int)])]
val words : org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = sc.parallelize( List(("a" , List( ("test" , 1) , ("test" , 1)))) )
val missingLabels : RDD[(String, Int)] = words.flatMap(m => m._2).reduceByKey((a, b) => a + b)
println("Labels Missing")
missingLabels.collect().foreach(println)
我怎样才能抓取标签,以便提取值 ("a" , ("test" , 2))
而不是 ("test" , 2)
?换句话说,输入 RDD[ (String , List( (String, Int) ))]
.
如果我没理解错的话,你应该稍微玩一下元组。
import org.apache.spark.rdd.RDD
val words : RDD[(String, List[(String, Int)])] = sc.parallelize( List(("a" , List( ("test" , 1) , ("test" , 1)))) )
val wordsWithLabels = words
.flatMap {
case (label, listOfValues) => listOfValues.map {
case (word,count) => (word, (label, count))
}
}
val result = wordsWithLabels
.reduceByKey {
case ((label1, count1), (label2, count2)) =>
(label1, count1 + count2)
}
.map {
case (word, (label, count)) =>
(label, (word, count))
}
result.foreach(println)
如果密钥可以重复,那么我假设您想将其全部减少到一对?如果是:
def reduceList(list: List[(String, Int)]) = list.groupBy(_._1).mapValues(_.aggregate(0)(_ + _._2, _ + _))
val words : org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = sc.parallelize( List(("a" , List( ("test" , 1) , ("test" , 1)))) )
val mergedList = words.mapValues((list : List[(String, Int)]) => reduceList(list).toList)
val missingLabels = mergedList.reduceByKey((accum: List[(String, Int)], value: List[(String, Int)]) =>
{
val valueMap = value.toMap
val accumMap = accum.toMap
val mergedMap = accumMap ++ valueMap.map{case(k,v) => k -> (v + accumMap.getOrElse(k, 0))}
mergedMap.toList
})
missingLabels.foreach(println)