如何在 "Scala" [Not In Spark] 中按键减少
How to Reduce by key in "Scala" [Not In Spark]
我正在尝试在 Scala 中 reduceByKeys,有没有什么方法可以根据 Scala 中的键来减少值。 [我知道我们可以通过 spark 中的 reduceByKey 方法来做,但是我们如何在 Scala 中做同样的事情呢? ]
输入数据是:
val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
.getLines()
.toList
val map = File.map(x => x.split(","))
.map(x => (x(1),x(4)))
map.take(10).foreach(println)
在上述步骤之后我得到的结果是:
(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)
预期结果:
(2,379.99)
(5,499.93)
.......
没有什么内置的,但是你可以这样写:
def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
var result = Map.empty[A, B]
items.foreach {
case (a, b) =>
result += (a -> result.get(a).map(b1 => f(b1, b)).getOrElse(b))
}
result
}
有一些 space 可以对此进行优化(例如使用可变映射),但总体思路保持不变。
另一种方法,更具声明性但效率较低(创建多个中间集合;可以重写但不清晰:
def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
items
.groupBy { case (a, _) => a }
.mapValues(_.map { case (_, b) => b }.reduce(f))
// mapValues returns a view, view.force changes it back to a realized map
.view.force
}
首先使用键对元组进行分组,第一个元素在这里,然后归约。
以下代码将起作用 -
val reducedList = map.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(_+_)))
print(reducedList)
您似乎想要文件中某些值的总和。一个问题是文件是字符串,因此您必须将 String
转换为数字格式才能求和。
这些是您可能会用到的步骤。
io.Source.fromFile("so.txt") //open file
.getLines() //read line-by-line
.map(_.split(",")) //each line is Array[String]
.toSeq //to something that can groupBy()
.groupBy(_(1)) //now is Map[String,Array[String]]
.mapValues(_.map(_(4).toInt).sum) //now is Map[String,Int]
.toSeq //un-Map it to (String,Int) tuples
.sorted //presentation order
.take(10) //sample
.foreach(println) //report
如果任何文件数据不符合要求的格式,这当然会抛出异常。
从 Scala 2.13
开始,您可以使用 groupMapReduce
方法(顾名思义)相当于 groupBy
后跟 mapValues
和 reduce
步骤:
io.Source.fromFile("file.txt")
.getLines.to(LazyList)
.map(_.split(','))
.groupMapReduce(_(1))(_(4).toDouble)(_ + _)
groupMapReduce
阶段:
group
s 按第二个元素 (_(1)
) 拆分数组(groupMapReduce 的组部分)
map
s 每个组中的每个数组出现到它的第 4 个元素并将其转换为 Double
(_(4).toDouble
)(映射部分组 映射减少)
reduce
s 每个组 (_ + _
) 中的值求和(减少部分 groupMapReduce)。
这是 one-pass version 可以翻译的内容:
seq.groupBy(_(1)).mapValues(_.map(_(4).toDouble).reduce(_ + _))
还要注意从 Iterator
到 LazyList
的转换,以便使用提供 groupMapReduce
的集合(我们不使用 Stream
,因为开始 Scala 2.13
, LazyList
是 Stream
s 的推荐替代品).
这是另一个使用 foldLeft 的解决方案:
val File : List[String] = ???
File.map(x => x.split(","))
.map(x => (x(1),x(4).toInt))
.foldLeft(Map.empty[String,Int]){case (state, (key,value)) => state.updated(key,state.get(key).getOrElse(0)+value)}
.toSeq
.sortBy(_._1)
.take(10)
.foreach(println)
我正在尝试在 Scala 中 reduceByKeys,有没有什么方法可以根据 Scala 中的键来减少值。 [我知道我们可以通过 spark 中的 reduceByKey 方法来做,但是我们如何在 Scala 中做同样的事情呢? ]
输入数据是:
val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
.getLines()
.toList
val map = File.map(x => x.split(","))
.map(x => (x(1),x(4)))
map.take(10).foreach(println)
在上述步骤之后我得到的结果是:
(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)
预期结果:
(2,379.99)
(5,499.93)
.......
没有什么内置的,但是你可以这样写:
def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
var result = Map.empty[A, B]
items.foreach {
case (a, b) =>
result += (a -> result.get(a).map(b1 => f(b1, b)).getOrElse(b))
}
result
}
有一些 space 可以对此进行优化(例如使用可变映射),但总体思路保持不变。
另一种方法,更具声明性但效率较低(创建多个中间集合;可以重写但不清晰:
def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
items
.groupBy { case (a, _) => a }
.mapValues(_.map { case (_, b) => b }.reduce(f))
// mapValues returns a view, view.force changes it back to a realized map
.view.force
}
首先使用键对元组进行分组,第一个元素在这里,然后归约。 以下代码将起作用 -
val reducedList = map.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(_+_)))
print(reducedList)
您似乎想要文件中某些值的总和。一个问题是文件是字符串,因此您必须将 String
转换为数字格式才能求和。
这些是您可能会用到的步骤。
io.Source.fromFile("so.txt") //open file
.getLines() //read line-by-line
.map(_.split(",")) //each line is Array[String]
.toSeq //to something that can groupBy()
.groupBy(_(1)) //now is Map[String,Array[String]]
.mapValues(_.map(_(4).toInt).sum) //now is Map[String,Int]
.toSeq //un-Map it to (String,Int) tuples
.sorted //presentation order
.take(10) //sample
.foreach(println) //report
如果任何文件数据不符合要求的格式,这当然会抛出异常。
从 Scala 2.13
开始,您可以使用 groupMapReduce
方法(顾名思义)相当于 groupBy
后跟 mapValues
和 reduce
步骤:
io.Source.fromFile("file.txt")
.getLines.to(LazyList)
.map(_.split(','))
.groupMapReduce(_(1))(_(4).toDouble)(_ + _)
groupMapReduce
阶段:
group
s 按第二个元素 (_(1)
) 拆分数组(groupMapReduce 的组部分)map
s 每个组中的每个数组出现到它的第 4 个元素并将其转换为Double
(_(4).toDouble
)(映射部分组 映射减少)reduce
s 每个组 (_ + _
) 中的值求和(减少部分 groupMapReduce)。
这是 one-pass version 可以翻译的内容:
seq.groupBy(_(1)).mapValues(_.map(_(4).toDouble).reduce(_ + _))
还要注意从 Iterator
到 LazyList
的转换,以便使用提供 groupMapReduce
的集合(我们不使用 Stream
,因为开始 Scala 2.13
, LazyList
是 Stream
s 的推荐替代品).
这是另一个使用 foldLeft 的解决方案:
val File : List[String] = ???
File.map(x => x.split(","))
.map(x => (x(1),x(4).toInt))
.foldLeft(Map.empty[String,Int]){case (state, (key,value)) => state.updated(key,state.get(key).getOrElse(0)+value)}
.toSeq
.sortBy(_._1)
.take(10)
.foreach(println)