如何在 "Scala" [Not In Spark] 中按键减少

How to Reduce by key in "Scala" [Not In Spark]

我正在尝试在 Scala 中 reduceByKeys,有没有什么方法可以根据 Scala 中的键来减少值。 [我知道我们可以通过 spark 中的 reduceByKey 方法来做,但是我们如何在 Scala 中做同样的事情呢? ]

输入数据是:

val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
                 .getLines()
                 .toList

 val map = File.map(x => x.split(","))
               .map(x => (x(1),x(4)))

  map.take(10).foreach(println)

在上述步骤之后我得到的结果是:

(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)

预期结果:

(2,379.99)
(5,499.93)
.......

没有什么内置的,但是你可以这样写:

def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
  var result = Map.empty[A, B]
  items.foreach {
    case (a, b) =>
      result += (a -> result.get(a).map(b1 => f(b1, b)).getOrElse(b))
  }
  result
}

有一些 space 可以对此进行优化(例如使用可变映射),但总体思路保持不变。

另一种方法,更具声明性但效率较低(创建多个中间集合;可以重写但不清晰:

def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
  items
    .groupBy { case (a, _) => a }
    .mapValues(_.map { case (_, b) => b }.reduce(f))
    // mapValues returns a view, view.force changes it back to a realized map
    .view.force
}

首先使用键对元组进行分组,第一个元素在这里,然后归约。 以下代码将起作用 -

val reducedList = map.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(_+_)))
print(reducedList)

您似乎想要文件中某些值的总和。一个问题是文件是字符串,因此您必须将 String 转换为数字格式才能求和。

这些是您可能会用到的步骤。

io.Source.fromFile("so.txt") //open file
  .getLines()                //read line-by-line
  .map(_.split(","))         //each line is Array[String]
  .toSeq                     //to something that can groupBy()
  .groupBy(_(1))             //now is Map[String,Array[String]]
  .mapValues(_.map(_(4).toInt).sum) //now is Map[String,Int]
  .toSeq                     //un-Map it to (String,Int) tuples
  .sorted                    //presentation order
  .take(10)                  //sample
  .foreach(println)          //report

如果任何文件数据不符合要求的格式,这当然会抛出异常。

Scala 2.13 开始,您可以使用 groupMapReduce 方法(顾名思义)相当于 groupBy 后跟 mapValuesreduce 步骤:

io.Source.fromFile("file.txt")
  .getLines.to(LazyList)
  .map(_.split(','))
  .groupMapReduce(_(1))(_(4).toDouble)(_ + _)

groupMapReduce阶段:

  • groups 按第二个元素 (_(1)) 拆分数组(groupMapReduce 的组部分)

  • maps 每个组中的每个数组出现到它的第 4 个元素并将其转换为 Double (_(4).toDouble)(映射部分组 映射减少)

  • reduces 每个组 (_ + _) 中的值求和(减少部分 groupMapReduce)。

这是 one-pass version 可以翻译的内容:

seq.groupBy(_(1)).mapValues(_.map(_(4).toDouble).reduce(_ + _))

还要注意从 IteratorLazyList 的转换,以便使用提供 groupMapReduce 的集合(我们不使用 Stream,因为开始 Scala 2.13, LazyListStreams 的推荐替代品).

这是另一个使用 foldLeft 的解决方案:

val File : List[String] = ???

File.map(x => x.split(","))
  .map(x => (x(1),x(4).toInt))
  .foldLeft(Map.empty[String,Int]){case (state, (key,value)) => state.updated(key,state.get(key).getOrElse(0)+value)}
  .toSeq
  .sortBy(_._1)
  .take(10)
  .foreach(println)