Scala:使用(并行)方法加快文件的字数统计

Scala: speeding up word count on a file using (parallel) approach

我需要 运行 在 Scala 中对多达一百万行的输入文件进行字数统计。每行也很长(> 150K 个字符)。以下是有效的标准程序:

val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\W+"))
.foldLeft(Map.empty[String, Int]){
    (count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

以下修改失败并出现错误,value par is not a member of Iterator[String]

val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\W+"))
.par
.foldLeft(Map.empty[String, Int]){
    (count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

我对此感到惊讶,因为类似的 似乎有效。

此外,我想知道 par.reduce 是否会比正常工作的 par.foldLeft 更快更高效。

对于此问题的任何帮助或线索,我们将不胜感激。

TIA

查看 。并行集合从 2.13 开始消失,尽管有一个外部库可供您使用。尽管如此,我还是要说,如果您希望并行处理大量数据,只需使用 spark(特别是,如果您无论如何都需要一个外部库(您可以 运行 在单个节点上使用 spark ...当你说,你需要这个解决方案“用于测试”时,你想测试一个解决方案,然后 运行 一个完全不同的解决方案感觉很奇怪。

这里有一个没有外部库的解决方案(只是为了完整性):

    // First, create a local execution context to allow throttling the parallel jobs 
   val parallelism = 4 // how many chunks to process in parallel

   implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
    parallelism, parallelism, 0L, TimeUnit.SECONDS,
    new ArrayBlockingQueue[Runnable](parallelism) {
      override def offer(e: Runnable) = {
        put(e)
        true
      }
    }
  ))

   // Now just split input into chunks and send to the executor
   // This does not read anything into memory yet
   
   val chunkSize = 4096 // how many lines to process at once

    val jobs = source 
     .getLines
     .grouped(chunkSize)
     .map { chunk => 
       Future {
          chunk
            .flatMap { _.split("""\W+""") }
            .foldLeft(Map.empty[String, Int]) { case (m, w) => 
                m + (w -> (m.getOrElse(w, 0) + 1))
            }
       }
     }

  // Now, combine the results.
  // This will fetch `parallelism*chunkSize` lines into memory and start
  // parallelism jobs processing the chunks. Once one of the jobs completes, 
  // it will read next `chunkSize` lines, and start another job. Etc.
    val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map { 
      _.reduce { (m1, m2) =>
        m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
      }
  }

关键是 ec 实施限制了当前“飞行中”的期货数量。您可以将它和分块逻辑包装到一个小实用程序 class 中,并根据需要使其可重用。 不过,如果我是你,我仍然会使用 spark。