这个 Akka 流有时不会结束
This Akka stream sometimes doesn't finish
我有一个图表,它从多个 gzip 文件中读取行并将这些行写入另一组 gzip 文件,根据每行中的某个值进行映射。
它适用于小数据集,但无法终止较大数据。 (这可能不是数据大小的问题,因为我没有 运行 足够多的时间来确定 - 这需要一段时间)。
def files: Source[File, NotUsed] =
Source.fromIterator(
() =>
Files
.fileTraverser()
.breadthFirst(inDir)
.asScala
.filter(_.getName.endsWith(".gz"))
.toIterator)
def extract =
Flow[File]
.mapConcat[String](unzip)
.mapConcat(s =>
(JsonMethods.parse(s) \ "tk").extract[Array[String]].map(_ -> s).to[collection.immutable.Iterable])
.groupBy(1 << 16, _._1)
.groupedWithin(1000, 1.second)
.map { lines =>
val w = writer(lines.head._1)
w.println(lines.map(_._2).mkString("\n"))
w.close()
Done
}
.mergeSubstreams
def unzip(f: File) = {
scala.io.Source
.fromInputStream(new GZIPInputStream(new FileInputStream(f)))
.getLines
.toIterable
.to[collection.immutable.Iterable]
}
def writer(tk: String): PrintWriter =
new PrintWriter(
new OutputStreamWriter(
new GZIPOutputStream(
new FileOutputStream(new File(outDir, s"$tk.json.gz"), true)
))
)
val process = files.via(extract).toMat(Sink.ignore)(Keep.right).run()
Await.result(process, Duration.Inf)
线程转储显示进程 WAITING
在 Await.result(process, Duration.Inf)
,没有其他事情发生。
带有 Akka v2.5.15 的 OpenJDK v11
很可能它卡在了 groupBy
,因为它 运行 调度程序中的可用线程无法将项目收集到所有来源的 2^16 个组中。
因此,如果我是你,我可能会使用 statefulMapConcat
和可变 Map[KeyType, List[String]]
半手动地在 extract
中实现分组。或者先用 groupedWithin
缓冲行,然后将它们分成几组,然后写入 Sink.foreach
.
中的不同文件
我有一个图表,它从多个 gzip 文件中读取行并将这些行写入另一组 gzip 文件,根据每行中的某个值进行映射。
它适用于小数据集,但无法终止较大数据。 (这可能不是数据大小的问题,因为我没有 运行 足够多的时间来确定 - 这需要一段时间)。
def files: Source[File, NotUsed] =
Source.fromIterator(
() =>
Files
.fileTraverser()
.breadthFirst(inDir)
.asScala
.filter(_.getName.endsWith(".gz"))
.toIterator)
def extract =
Flow[File]
.mapConcat[String](unzip)
.mapConcat(s =>
(JsonMethods.parse(s) \ "tk").extract[Array[String]].map(_ -> s).to[collection.immutable.Iterable])
.groupBy(1 << 16, _._1)
.groupedWithin(1000, 1.second)
.map { lines =>
val w = writer(lines.head._1)
w.println(lines.map(_._2).mkString("\n"))
w.close()
Done
}
.mergeSubstreams
def unzip(f: File) = {
scala.io.Source
.fromInputStream(new GZIPInputStream(new FileInputStream(f)))
.getLines
.toIterable
.to[collection.immutable.Iterable]
}
def writer(tk: String): PrintWriter =
new PrintWriter(
new OutputStreamWriter(
new GZIPOutputStream(
new FileOutputStream(new File(outDir, s"$tk.json.gz"), true)
))
)
val process = files.via(extract).toMat(Sink.ignore)(Keep.right).run()
Await.result(process, Duration.Inf)
线程转储显示进程 WAITING
在 Await.result(process, Duration.Inf)
,没有其他事情发生。
带有 Akka v2.5.15 的 OpenJDK v11
很可能它卡在了 groupBy
,因为它 运行 调度程序中的可用线程无法将项目收集到所有来源的 2^16 个组中。
因此,如果我是你,我可能会使用 statefulMapConcat
和可变 Map[KeyType, List[String]]
半手动地在 extract
中实现分组。或者先用 groupedWithin
缓冲行,然后将它们分成几组,然后写入 Sink.foreach
.