Scala RDD 按范围计数
Scala RDD count by range
我需要 "extract" Iterable[MyObject] 中包含的一些数据(它是 groupBy 之前的 RDD[MyObject])。
我的初始 RDD[MyObject] :
|-----------|---------|----------|
| startCity | endCity | Customer |
|-----------|---------|----------|
| Paris | London | ID | Age |
| | |----|-----|
| | | 1 | 1 |
| | |----|-----|
| | | 2 | 1 |
| | |----|-----|
| | | 3 | 50 |
|-----------|---------|----------|
| Paris | London | ID | Age |
| | |----|-----|
| | | 5 | 40 |
| | |----|-----|
| | | 6 | 41 |
| | |----|-----|
| | | 7 | 2 |
|-----------|---------|----|-----|
| New-York | Paris | ID | Age |
| | |----|-----|
| | | 9 | 15 |
| | |----|-----|
| | | 10| 16 |
| | |----|-----|
| | | 11| 46 |
|-----------|---------|----|-----|
| New-York | Paris | ID | Age |
| | |----|-----|
| | | 13| 7 |
| | |----|-----|
| | | 14| 9 |
| | |----|-----|
| | | 15| 60 |
|-----------|---------|----|-----|
| Barcelona | London | ID | Age |
| | |----|-----|
| | | 17| 66 |
| | |----|-----|
| | | 18| 53 |
| | |----|-----|
| | | 19| 11 |
|-----------|---------|----|-----|
我需要按年龄范围和 groupBy startCity - endCity
对他们进行计数
最终结果应该是:
|-----------|---------|-------------|
| startCity | endCity | Customer |
|-----------|---------|-------------|
| Paris | London | Range| Count|
| | |------|------|
| | |0-2 | 3 |
| | |------|------|
| | |3-18 | 0 |
| | |------|------|
| | |19-99 | 3 |
|-----------|---------|-------------|
| New-York | Paris | Range| Count|
| | |------|------|
| | |0-2 | 0 |
| | |------|------|
| | |3-18 | 3 |
| | |------|------|
| | |19-99 | 2 |
|-----------|---------|-------------|
| Barcelona | London | Range| Count|
| | |------|------|
| | |0-2 | 0 |
| | |------|------|
| | |3-18 | 1 |
| | |------|------|
| | |19-99 | 2 |
|-----------|---------|-------------|
目前我正在通过对相同数据进行 3 次计数来执行此操作(第一次范围为 0-2,然后是 10-20,然后是 21-99)。
喜欢:
Iterable[MyObject] ite
ite.count(x => x.age match {
case Some(age) => { age >= 0 && age < 2 }
}
它通过给我一个整数来工作,但我认为效率不高,因为我必须数很多次,请问最好的方法是什么?
谢谢
编辑:客户对象是一个案例class
def computeRange(age : Int) =
if(age<=2)
"0-2"
else if(age<=10)
"2-10"
// etc, you get the idea
然后,RDD 为 case class MyObject(id : String, age : Int)
rdd
.map(x=> computeRange(x.age) -> 1)
.reduceByKey(_+_)
编辑:
如果你需要按某些列分组,你可以这样做,前提是你有一个 RDD[(SomeColumns, Iterable[MyObject])]。以下几行将为您提供一个将每个 "range" 与其出现次数相关联的地图。
def computeMapOfOccurances(list : Iterable[MyObject]) : Map[String, Int] =
list
.map(_.age)
.map(computeRange)
.groupBy(x=>x)
.mapValues(_.size)
val result1 = rdd
.mapValues( computeMapOfOccurances(_))
如果你需要对数据进行扁平化,你可以这样写:
val result2 = result1
.flatMapValues(_.toSeq)
假设您有 Customer[Object]
作为 case class
,如下所示
case class Customer(ID: Int, Age: Int)
你的 RDD[MyObject]
是 case class
的 rdd
,如下所示
case class MyObject(startCity: String, endCity: String, customer: List[Customer])
所以使用上面的 case class
es 你应该有如下输入(你有 table 格式)
MyObject(Paris,London,List(Customer(1,1), Customer(2,1), Customer(3,50)))
MyObject(Paris,London,List(Customer(5,40), Customer(6,41), Customer(7,2)))
MyObject(New-York,Paris,List(Customer(9,15), Customer(10,16), Customer(11,46)))
MyObject(New-York,Paris,List(Customer(13,7), Customer(14,9), Customer(15,60)))
MyObject(Barcelona,London,List(Customer(17,66), Customer(18,53), Customer(19,11)))
你还提到分组后你有 Iterable[MyObject]
相当于下面的步骤
val groupedRDD = rdd.groupBy(myobject => (myobject.startCity, myobject.endCity)) //groupedRDD: org.apache.spark.rdd.RDD[((String, String), Iterable[MyObject])] = ShuffledRDD[2] at groupBy at worksheetTest.sc:23
所以下一步你要做的就是用mapValues
遍历Iterable[MyObject]
,然后统计属于每个范围的age
,最后转换为您需要的输出如下
val finalResult = groupedRDD.mapValues(x => {
val rangeAge = Map("0-2" -> 0, "3-18" -> 0, "19-99" -> 0)
val list = x.flatMap(y => y.customer.map(z => z.Age)).toList
updateCounts(list, rangeAge).map(x => CustomerOut(x._1, x._2)).toList
})
其中 updateCounts
是一个 递归函数
def updateCounts(ageList: List[Int], map: Map[String, Int]) : Map[String, Int] = ageList match{
case head :: tail => if(head >= 0 && head < 3) {
updateCounts(tail, map ++ Map("0-2" -> (map("0-2")+1)))
} else if(head >= 3 && head < 19) {
updateCounts(tail, map ++ Map("3-18" -> (map("3-18")+1)))
} else updateCounts(tail, map ++ Map("19-99" -> (map("19-99")+1)))
case Nil => map
}
和CustomerOut
是另一个case class
case class CustomerOut(Range: String, Count: Int)
所以finalResult
如下
((Barcelona,London),List(CustomerOut(0-2,0), CustomerOut(3-18,1), CustomerOut(19-99,2)))
((New-York,Paris),List(CustomerOut(0-2,0), CustomerOut(3-18,4), CustomerOut(19-99,2)))
((Paris,London),List(CustomerOut(0-2,3), CustomerOut(3-18,0), CustomerOut(19-99,3)))
我需要 "extract" Iterable[MyObject] 中包含的一些数据(它是 groupBy 之前的 RDD[MyObject])。
我的初始 RDD[MyObject] :
|-----------|---------|----------|
| startCity | endCity | Customer |
|-----------|---------|----------|
| Paris | London | ID | Age |
| | |----|-----|
| | | 1 | 1 |
| | |----|-----|
| | | 2 | 1 |
| | |----|-----|
| | | 3 | 50 |
|-----------|---------|----------|
| Paris | London | ID | Age |
| | |----|-----|
| | | 5 | 40 |
| | |----|-----|
| | | 6 | 41 |
| | |----|-----|
| | | 7 | 2 |
|-----------|---------|----|-----|
| New-York | Paris | ID | Age |
| | |----|-----|
| | | 9 | 15 |
| | |----|-----|
| | | 10| 16 |
| | |----|-----|
| | | 11| 46 |
|-----------|---------|----|-----|
| New-York | Paris | ID | Age |
| | |----|-----|
| | | 13| 7 |
| | |----|-----|
| | | 14| 9 |
| | |----|-----|
| | | 15| 60 |
|-----------|---------|----|-----|
| Barcelona | London | ID | Age |
| | |----|-----|
| | | 17| 66 |
| | |----|-----|
| | | 18| 53 |
| | |----|-----|
| | | 19| 11 |
|-----------|---------|----|-----|
我需要按年龄范围和 groupBy startCity - endCity
对他们进行计数最终结果应该是:
|-----------|---------|-------------|
| startCity | endCity | Customer |
|-----------|---------|-------------|
| Paris | London | Range| Count|
| | |------|------|
| | |0-2 | 3 |
| | |------|------|
| | |3-18 | 0 |
| | |------|------|
| | |19-99 | 3 |
|-----------|---------|-------------|
| New-York | Paris | Range| Count|
| | |------|------|
| | |0-2 | 0 |
| | |------|------|
| | |3-18 | 3 |
| | |------|------|
| | |19-99 | 2 |
|-----------|---------|-------------|
| Barcelona | London | Range| Count|
| | |------|------|
| | |0-2 | 0 |
| | |------|------|
| | |3-18 | 1 |
| | |------|------|
| | |19-99 | 2 |
|-----------|---------|-------------|
目前我正在通过对相同数据进行 3 次计数来执行此操作(第一次范围为 0-2,然后是 10-20,然后是 21-99)。
喜欢:
Iterable[MyObject] ite
ite.count(x => x.age match {
case Some(age) => { age >= 0 && age < 2 }
}
它通过给我一个整数来工作,但我认为效率不高,因为我必须数很多次,请问最好的方法是什么?
谢谢
编辑:客户对象是一个案例class
def computeRange(age : Int) =
if(age<=2)
"0-2"
else if(age<=10)
"2-10"
// etc, you get the idea
然后,RDD 为 case class MyObject(id : String, age : Int)
rdd
.map(x=> computeRange(x.age) -> 1)
.reduceByKey(_+_)
编辑: 如果你需要按某些列分组,你可以这样做,前提是你有一个 RDD[(SomeColumns, Iterable[MyObject])]。以下几行将为您提供一个将每个 "range" 与其出现次数相关联的地图。
def computeMapOfOccurances(list : Iterable[MyObject]) : Map[String, Int] =
list
.map(_.age)
.map(computeRange)
.groupBy(x=>x)
.mapValues(_.size)
val result1 = rdd
.mapValues( computeMapOfOccurances(_))
如果你需要对数据进行扁平化,你可以这样写:
val result2 = result1
.flatMapValues(_.toSeq)
假设您有 Customer[Object]
作为 case class
,如下所示
case class Customer(ID: Int, Age: Int)
你的 RDD[MyObject]
是 case class
的 rdd
,如下所示
case class MyObject(startCity: String, endCity: String, customer: List[Customer])
所以使用上面的 case class
es 你应该有如下输入(你有 table 格式)
MyObject(Paris,London,List(Customer(1,1), Customer(2,1), Customer(3,50)))
MyObject(Paris,London,List(Customer(5,40), Customer(6,41), Customer(7,2)))
MyObject(New-York,Paris,List(Customer(9,15), Customer(10,16), Customer(11,46)))
MyObject(New-York,Paris,List(Customer(13,7), Customer(14,9), Customer(15,60)))
MyObject(Barcelona,London,List(Customer(17,66), Customer(18,53), Customer(19,11)))
你还提到分组后你有 Iterable[MyObject]
相当于下面的步骤
val groupedRDD = rdd.groupBy(myobject => (myobject.startCity, myobject.endCity)) //groupedRDD: org.apache.spark.rdd.RDD[((String, String), Iterable[MyObject])] = ShuffledRDD[2] at groupBy at worksheetTest.sc:23
所以下一步你要做的就是用mapValues
遍历Iterable[MyObject]
,然后统计属于每个范围的age
,最后转换为您需要的输出如下
val finalResult = groupedRDD.mapValues(x => {
val rangeAge = Map("0-2" -> 0, "3-18" -> 0, "19-99" -> 0)
val list = x.flatMap(y => y.customer.map(z => z.Age)).toList
updateCounts(list, rangeAge).map(x => CustomerOut(x._1, x._2)).toList
})
其中 updateCounts
是一个 递归函数
def updateCounts(ageList: List[Int], map: Map[String, Int]) : Map[String, Int] = ageList match{
case head :: tail => if(head >= 0 && head < 3) {
updateCounts(tail, map ++ Map("0-2" -> (map("0-2")+1)))
} else if(head >= 3 && head < 19) {
updateCounts(tail, map ++ Map("3-18" -> (map("3-18")+1)))
} else updateCounts(tail, map ++ Map("19-99" -> (map("19-99")+1)))
case Nil => map
}
和CustomerOut
是另一个case class
case class CustomerOut(Range: String, Count: Int)
所以finalResult
如下
((Barcelona,London),List(CustomerOut(0-2,0), CustomerOut(3-18,1), CustomerOut(19-99,2)))
((New-York,Paris),List(CustomerOut(0-2,0), CustomerOut(3-18,4), CustomerOut(19-99,2)))
((Paris,London),List(CustomerOut(0-2,3), CustomerOut(3-18,0), CustomerOut(19-99,3)))