无法使用 Spark Scala 应用 AggregateByKey 来获取给定输入(id,值)的(id,total,(max,min))
Not able to apply AggregateByKey using Spark Scala to get (id, total,( max, min)) for given input(id, value)
我试过这段代码,但它说元组不能被解构。
forditems.map(stre=>(stre.split(",")(1).toInt, stre.split(",")(4).toFloat)).aggregateByKey((0.0f, (0.0f, 0.0f)))(
(t,v) => (t._1 + v, ( if(v > t._2._1) v else t._2._1 , if(v > t._2._2) v else t._2._2 ))
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else x._2._1, if(x._2_.2 < t._2._2) t._2._2 else x._2._2 )))
错误信息
<console>:7: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
Either create a single parameter accepting the Tuple1,
or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else x._2._1,
if(x._2_.2 < t._2._2) t._2._2 else x._2._2 ))).filter(stre=>
stre._1==2).take(10).foreach(println)
^
<console>:7: error: ')' expected but double literal found.
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else x._2._1,
if(x._2_.2 < t._2._2) t._2._2 else x._2._2 ))).filter(stre=>
stre._1==2).take(10).foreach(println)
这里实际上只有很小的句法错误:
- 传递给
aggregateByKey
的第一个和第二个参数之间缺少逗号(第二行结尾)
- 您输入的是
x._2_.2
而不是 x._2._2
修复这些会得到你想要的输出。
BUT - 也就是说,值得注意的是 Scala 提供了更好的语法选项来处理元组和执行简单的算术运算。更重要的是,不应该过度使用元组,一个常见的替代方法是创建一个 case class 来更好地支持您尝试执行的操作。
例如 - 我们可以创建一个简单的 Stats
案例 class,它有一个 agg
方法:
case class Stats(total: Float, min: Float, max: Float) {
def agg(other: Stats): Stats = Stats(
total + other.total,
math.min(min, other.min),
math.max(max, other.max)
)
}
然后使用reduceByKey
这个函数:
val result: RDD[(Int, Stats)] = forditems
.map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat))
.mapValues(f => Stats(f, f, f))
.reduceByKey(_ agg _)
此外,如果您愿意使用 Spark 的 DateFrames - 这会变得更加简单:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = forditems.map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat)).toDF("k", "v")
val resultDf = df.groupBy("k").agg(sum($"v"), min($"v"), max($"v"))
resultDf.show()
// +---+------+------+------+
// | k|sum(v)|min(v)|max(v)|
// +---+------+------+------+
// | 1| 444.0| 4.0| 400.0|
// +---+------+------+------+
我试过这段代码,但它说元组不能被解构。
forditems.map(stre=>(stre.split(",")(1).toInt, stre.split(",")(4).toFloat)).aggregateByKey((0.0f, (0.0f, 0.0f)))(
(t,v) => (t._1 + v, ( if(v > t._2._1) v else t._2._1 , if(v > t._2._2) v else t._2._2 ))
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else x._2._1, if(x._2_.2 < t._2._2) t._2._2 else x._2._2 )))
错误信息
<console>:7: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
Either create a single parameter accepting the Tuple1,
or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else x._2._1,
if(x._2_.2 < t._2._2) t._2._2 else x._2._2 ))).filter(stre=>
stre._1==2).take(10).foreach(println)
^
<console>:7: error: ')' expected but double literal found.
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else x._2._1,
if(x._2_.2 < t._2._2) t._2._2 else x._2._2 ))).filter(stre=>
stre._1==2).take(10).foreach(println)
这里实际上只有很小的句法错误:
- 传递给
aggregateByKey
的第一个和第二个参数之间缺少逗号(第二行结尾) - 您输入的是
x._2_.2
而不是x._2._2
修复这些会得到你想要的输出。
BUT - 也就是说,值得注意的是 Scala 提供了更好的语法选项来处理元组和执行简单的算术运算。更重要的是,不应该过度使用元组,一个常见的替代方法是创建一个 case class 来更好地支持您尝试执行的操作。
例如 - 我们可以创建一个简单的 Stats
案例 class,它有一个 agg
方法:
case class Stats(total: Float, min: Float, max: Float) {
def agg(other: Stats): Stats = Stats(
total + other.total,
math.min(min, other.min),
math.max(max, other.max)
)
}
然后使用reduceByKey
这个函数:
val result: RDD[(Int, Stats)] = forditems
.map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat))
.mapValues(f => Stats(f, f, f))
.reduceByKey(_ agg _)
此外,如果您愿意使用 Spark 的 DateFrames - 这会变得更加简单:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = forditems.map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat)).toDF("k", "v")
val resultDf = df.groupBy("k").agg(sum($"v"), min($"v"), max($"v"))
resultDf.show()
// +---+------+------+------+
// | k|sum(v)|min(v)|max(v)|
// +---+------+------+------+
// | 1| 444.0| 4.0| 400.0|
// +---+------+------+------+