如何使用 combineByKey RDD 对多个字段进行分组和聚合?
How to groupby and aggregate multiple fields using combineByKey RDD?
我有一个示例文件,我试图使用 combineByKey
找出另一个字段的给定字段总数及其计数和另一个字段的值列表。我正在尝试理解 combineByKey
,我使用 aggregateByKey
从 理解的相同要求,现在我想理解 combineByKey
.
我尝试了以下与 aggregateByKey
相同的代码,但出现类型不匹配错误。我不确定我的类型是否适合 createCombiner
或 mergeValue
或 mergeCombiner
。请帮助我更好地理解 combineByKey
。
示例数据:
44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79
combineByKey
的代码:
val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))
def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)
def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
(acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}
def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
(acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
错误信息:
error: type mismatch;
found : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
^
预期的结果是:
customerid, (orderids,..,..,....), totalamount, number of orderids
使用提供的示例数据,它将是:
(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))
不匹配指向 createCombiner
。谁能帮我理解一下 combineByKey
?
我对 Spark 不熟悉。
希望对您有所帮助。
val array = Array((44,8602,37.19),(44,8331,99.19),(44,1919,39.54),(44,2682,41.88),(44,7366,66.54),(44,3405,81.09),(44,9957,94.79))
array.groupBy(_._1).map(e => (e._1, e._2.map(_._2).toList, e._2.map(_._3).sum))
//res1: scala.collection.immutable.Iterable[(Int, List[Int], Double)] = List((44,List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.21999999999997))
我看到你的这个错误是由于
def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)
我认为 createCombiner
应该采用一些 Seq
的元组和 return 的 Int
和 Seq
的元组(groupby)
def createCombiner = (tuple: Seq[(Int,Int, Double)]) => tuple.groupBy(_._1)
希望对您有所帮助。
这里是combineByKey的签名:
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
mergeValue 的类型为 (C, V) => C
其中 C 应为 ((Seq[Int],Double, Int), Int)
,V 应为 (Seq[Int],Double, Int)
您的 mergeValue 方法具有类型 C (Seq[Int],Double,Int)
和 V (Int,Float)
mergeCombiner的类型也不对
这应该是 (C, C) => C
其中 C 是 ((Seq[Int],Double, Int), Int)
这里的问题是 createCombiner
函数。看看combineByKey
:
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
简单地说,C
是您希望以 ((Seq[Int], Double, Int)
) 结束的格式,V
是您开始的格式 ((Int, Double)
)。在这里,我将 Float
更改为 Double
,因为这是 Spark 中通常使用的。这意味着 createCombiner
函数应该如下所示:
def createCombiner = (tuple: (Int, Double)) => (Seq(tuple._1), tuple._2, 1)
mergeValue
和 mergeCombiner
看起来都不错,但是,如果您在集群上执行代码,您将不会在 Spark 中看到任何打印语句(参见:Spark losing println() on stdout)。
我有一个示例文件,我试图使用 combineByKey
找出另一个字段的给定字段总数及其计数和另一个字段的值列表。我正在尝试理解 combineByKey
,我使用 aggregateByKey
从 combineByKey
.
我尝试了以下与 aggregateByKey
相同的代码,但出现类型不匹配错误。我不确定我的类型是否适合 createCombiner
或 mergeValue
或 mergeCombiner
。请帮助我更好地理解 combineByKey
。
示例数据:
44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79
combineByKey
的代码:
val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))
def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)
def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
(acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}
def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
(acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
错误信息:
error: type mismatch;
found : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
^
预期的结果是:
customerid, (orderids,..,..,....), totalamount, number of orderids
使用提供的示例数据,它将是:
(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))
不匹配指向 createCombiner
。谁能帮我理解一下 combineByKey
?
我对 Spark 不熟悉。
希望对您有所帮助。
val array = Array((44,8602,37.19),(44,8331,99.19),(44,1919,39.54),(44,2682,41.88),(44,7366,66.54),(44,3405,81.09),(44,9957,94.79))
array.groupBy(_._1).map(e => (e._1, e._2.map(_._2).toList, e._2.map(_._3).sum))
//res1: scala.collection.immutable.Iterable[(Int, List[Int], Double)] = List((44,List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.21999999999997))
我看到你的这个错误是由于
def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)
我认为 createCombiner
应该采用一些 Seq
的元组和 return 的 Int
和 Seq
的元组(groupby)
def createCombiner = (tuple: Seq[(Int,Int, Double)]) => tuple.groupBy(_._1)
希望对您有所帮助。
这里是combineByKey的签名:
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
mergeValue 的类型为 (C, V) => C
其中 C 应为 ((Seq[Int],Double, Int), Int)
,V 应为 (Seq[Int],Double, Int)
您的 mergeValue 方法具有类型 C (Seq[Int],Double,Int)
和 V (Int,Float)
mergeCombiner的类型也不对
这应该是 (C, C) => C
其中 C 是 ((Seq[Int],Double, Int), Int)
这里的问题是 createCombiner
函数。看看combineByKey
:
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
简单地说,C
是您希望以 ((Seq[Int], Double, Int)
) 结束的格式,V
是您开始的格式 ((Int, Double)
)。在这里,我将 Float
更改为 Double
,因为这是 Spark 中通常使用的。这意味着 createCombiner
函数应该如下所示:
def createCombiner = (tuple: (Int, Double)) => (Seq(tuple._1), tuple._2, 1)
mergeValue
和 mergeCombiner
看起来都不错,但是,如果您在集群上执行代码,您将不会在 Spark 中看到任何打印语句(参见:Spark losing println() on stdout)。