如何使用 combineByKey RDD 对多个字段进行分组和聚合?

How to groupby and aggregate multiple fields using combineByKey RDD?

我有一个示例文件,我试图使用 combineByKey 找出另一个字段的给定字段总数及其计数和另一个字段的值列表。我正在尝试理解 combineByKey,我使用 aggregateByKey 理解的相同要求,现在我想理解 combineByKey.

我尝试了以下与 aggregateByKey 相同的代码,但出现类型不匹配错误。我不确定我的类型是否适合 createCombinermergeValuemergeCombiner。请帮助我更好地理解 combineByKey

示例数据:

44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79 

combineByKey的代码:

val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))

def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)

def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
  println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
  (acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}

def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
  println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
  (acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}

rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)

错误信息:

error: type mismatch;
found   : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
                 ^

预期的结果是:

customerid, (orderids,..,..,....), totalamount, number of orderids

使用提供的示例数据,它将是:

(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))

不匹配指向 createCombiner。谁能帮我理解一下 combineByKey?

我对 Spark 不熟悉。

希望对您有所帮助。

val array = Array((44,8602,37.19),(44,8331,99.19),(44,1919,39.54),(44,2682,41.88),(44,7366,66.54),(44,3405,81.09),(44,9957,94.79))

array.groupBy(_._1).map(e => (e._1, e._2.map(_._2).toList, e._2.map(_._3).sum))
//res1: scala.collection.immutable.Iterable[(Int, List[Int], Double)] = List((44,List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.21999999999997))

我看到你的这个错误是由于

def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)

我认为 createCombiner 应该采用一些 Seq 的元组和 return 的 IntSeq 的元组(groupby)

def createCombiner = (tuple: Seq[(Int,Int, Double)]) => tuple.groupBy(_._1)

希望对您有所帮助。

这里是combineByKey的签名:

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

mergeValue 的类型为 (C, V) => C

其中 C 应为 ((Seq[Int],Double, Int), Int),V 应为 (Seq[Int],Double, Int)

您的 mergeValue 方法具有类型 C (Seq[Int],Double,Int) 和 V (Int,Float)

mergeCombiner的类型也不对

这应该是 (C, C) => C 其中 C 是 ((Seq[Int],Double, Int), Int)

这里的问题是 createCombiner 函数。看看combineByKey:

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

简单地说,C 是您希望以 ((Seq[Int], Double, Int)) 结束的格式,V 是您开始的格式 ((Int, Double))。在这里,我将 Float 更改为 Double,因为这是 Spark 中通常使用的。这意味着 createCombiner 函数应该如下所示:

def createCombiner = (tuple: (Int, Double)) => (Seq(tuple._1), tuple._2, 1)

mergeValuemergeCombiner 看起来都不错,但是,如果您在集群上执行代码,您将不会在 Spark 中看到任何打印语句(参见:Spark losing println() on stdout)。