如何使用akka-streams实现分页
How to implement pagination with akka-streams
我需要按行处理大文件并对每个项目做一些繁重的工作(在 4 核 cpu 上),我认为代码正确:
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val sink = Sink.foreach[String](elem => println("element proceed"))
FileIO.fromPath(Paths.get("file.txt"))
.via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
.mapAsync(4)(v =>
//long op
Future {
Thread.sleep(500)
"updated_" + v
})
.to(sink)
.run()
但我想要输出如下:
100 element proceed
200 element proceed
300 element proceed
357 element proceed. done
如何实现?
您可以使用 Flow.grouped
:
val groupSize = 100
val groupedFlow = Flow[String].grouped(groupSize)
现在可以在 mapAsync
:
之前或之后注入此 Flow
FileIO.fromPath(Paths.get("file.txt"))
.via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
.via(groupedFlow)
...
我需要按行处理大文件并对每个项目做一些繁重的工作(在 4 核 cpu 上),我认为代码正确:
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val sink = Sink.foreach[String](elem => println("element proceed"))
FileIO.fromPath(Paths.get("file.txt"))
.via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
.mapAsync(4)(v =>
//long op
Future {
Thread.sleep(500)
"updated_" + v
})
.to(sink)
.run()
但我想要输出如下:
100 element proceed
200 element proceed
300 element proceed
357 element proceed. done
如何实现?
您可以使用 Flow.grouped
:
val groupSize = 100
val groupedFlow = Flow[String].grouped(groupSize)
现在可以在 mapAsync
:
FileIO.fromPath(Paths.get("file.txt"))
.via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
.via(groupedFlow)
...