Scala:使用(并行)方法加快文件的字数统计
Scala: speeding up word count on a file using (parallel) approach
我需要 运行 在 Scala 中对多达一百万行的输入文件进行字数统计。每行也很长(> 150K 个字符)。以下是有效的标准程序:
val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\W+"))
.foldLeft(Map.empty[String, Int]){
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
以下修改失败并出现错误,value par is not a member of Iterator[String]
val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\W+"))
.par
.foldLeft(Map.empty[String, Int]){
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
我对此感到惊讶,因为类似的 似乎有效。
此外,我想知道 par.reduce
是否会比正常工作的 par.foldLeft
更快更高效。
对于此问题的任何帮助或线索,我们将不胜感激。
TIA
查看 。并行集合从 2.13 开始消失,尽管有一个外部库可供您使用。尽管如此,我还是要说,如果您希望并行处理大量数据,只需使用 spark(特别是,如果您无论如何都需要一个外部库(您可以 运行 在单个节点上使用 spark ...当你说,你需要这个解决方案“用于测试”时,你想测试一个解决方案,然后 运行 一个完全不同的解决方案感觉很奇怪。
这里有一个没有外部库的解决方案(只是为了完整性):
// First, create a local execution context to allow throttling the parallel jobs
val parallelism = 4 // how many chunks to process in parallel
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
parallelism, parallelism, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[Runnable](parallelism) {
override def offer(e: Runnable) = {
put(e)
true
}
}
))
// Now just split input into chunks and send to the executor
// This does not read anything into memory yet
val chunkSize = 4096 // how many lines to process at once
val jobs = source
.getLines
.grouped(chunkSize)
.map { chunk =>
Future {
chunk
.flatMap { _.split("""\W+""") }
.foldLeft(Map.empty[String, Int]) { case (m, w) =>
m + (w -> (m.getOrElse(w, 0) + 1))
}
}
}
// Now, combine the results.
// This will fetch `parallelism*chunkSize` lines into memory and start
// parallelism jobs processing the chunks. Once one of the jobs completes,
// it will read next `chunkSize` lines, and start another job. Etc.
val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map {
_.reduce { (m1, m2) =>
m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
}
}
关键是 ec
实施限制了当前“飞行中”的期货数量。您可以将它和分块逻辑包装到一个小实用程序 class 中,并根据需要使其可重用。
不过,如果我是你,我仍然会使用 spark。
我需要 运行 在 Scala 中对多达一百万行的输入文件进行字数统计。每行也很长(> 150K 个字符)。以下是有效的标准程序:
val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\W+"))
.foldLeft(Map.empty[String, Int]){
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
以下修改失败并出现错误,value par is not a member of Iterator[String]
val wordCount = scala.io.Source.fromFile("sample.dat")
.getLines
.flatMap(_.split("\W+"))
.par
.foldLeft(Map.empty[String, Int]){
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
我对此感到惊讶,因为类似的
此外,我想知道 par.reduce
是否会比正常工作的 par.foldLeft
更快更高效。
对于此问题的任何帮助或线索,我们将不胜感激。
TIA
查看
这里有一个没有外部库的解决方案(只是为了完整性):
// First, create a local execution context to allow throttling the parallel jobs
val parallelism = 4 // how many chunks to process in parallel
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(
parallelism, parallelism, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[Runnable](parallelism) {
override def offer(e: Runnable) = {
put(e)
true
}
}
))
// Now just split input into chunks and send to the executor
// This does not read anything into memory yet
val chunkSize = 4096 // how many lines to process at once
val jobs = source
.getLines
.grouped(chunkSize)
.map { chunk =>
Future {
chunk
.flatMap { _.split("""\W+""") }
.foldLeft(Map.empty[String, Int]) { case (m, w) =>
m + (w -> (m.getOrElse(w, 0) + 1))
}
}
}
// Now, combine the results.
// This will fetch `parallelism*chunkSize` lines into memory and start
// parallelism jobs processing the chunks. Once one of the jobs completes,
// it will read next `chunkSize` lines, and start another job. Etc.
val result: Future[Map[String, Int]] = Future.sequence(jobs.toSeq).map {
_.reduce { (m1, m2) =>
m1.foldLeft(m2){ case (m, (w, v)) => m + (w -> (m.getOrElse(w,0) + v)) }
}
}
关键是 ec
实施限制了当前“飞行中”的期货数量。您可以将它和分块逻辑包装到一个小实用程序 class 中,并根据需要使其可重用。
不过,如果我是你,我仍然会使用 spark。