无法使用 Spark Scala 应用 AggregateByKey 来获取给定输入(id,值)的(id,total,(max,min))

Not able to apply AggregateByKey using Spark Scala to get (id, total,( max, min)) for given input(id, value)

我试过这段代码,但它说元组不能被解构。

forditems.map(stre=>(stre.split(",")(1).toInt, stre.split(",")(4).toFloat)).aggregateByKey((0.0f, (0.0f, 0.0f)))(
(t,v) => (t._1 + v, ( if(v > t._2._1) v else  t._2._1 , if(v > t._2._2) v else  t._2._2 ))
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else  x._2._1, if(x._2_.2 < t._2._2) t._2._2 else  x._2._2 )))

错误信息

<console>:7: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
  Either create a single parameter accepting the Tuple1,
  or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
 (x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else  x._2._1, 
 if(x._2_.2 < t._2._2) t._2._2 else  x._2._2 ))).filter(stre=> 
 stre._1==2).take(10).foreach(println)
 ^
 <console>:7: error: ')' expected but double literal found.
 (x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else  x._2._1, 
 if(x._2_.2 < t._2._2) t._2._2 else  x._2._2 ))).filter(stre=> 
 stre._1==2).take(10).foreach(println)

这里实际上只有很小的句法错误:

  • 传递给 aggregateByKey 的第一个和第二个参数之间缺少逗号(第二行结尾)
  • 您输入的是 x._2_.2 而不是 x._2._2

修复这些会得到你想要的输出。

BUT - 也就是说,值得注意的是 Scala 提供了更好的语法选项来处理元组和执行简单的算术运算。更重要的是,不应该过度使用元组,一个常见的替代方法是创建一个 case class 来更好地支持您尝试执行的操作。

例如 - 我们可以创建一个简单的 Stats 案例 class,它有一个 agg 方法:

case class Stats(total: Float, min: Float, max: Float) {
  def agg(other: Stats): Stats = Stats(
    total + other.total,
    math.min(min, other.min),
    math.max(max, other.max)
  )
}

然后使用reduceByKey这个函数:

val result: RDD[(Int, Stats)] = forditems
  .map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat))
  .mapValues(f => Stats(f, f, f))
  .reduceByKey(_ agg _)

此外,如果您愿意使用 Spark 的 DateFrames - 这会变得更加简单:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = forditems.map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat)).toDF("k", "v")

val resultDf = df.groupBy("k").agg(sum($"v"), min($"v"), max($"v"))
resultDf.show()

// +---+------+------+------+
// |  k|sum(v)|min(v)|max(v)|
// +---+------+------+------+
// |  1| 444.0|   4.0| 400.0|
// +---+------+------+------+