如何 运行 Akka Streams 图形在一个单独的调度器上超时?
How to run Akka Streams graph on a separate dispatcher with timeout?
这个问题是基于我做的一个宠物项目和这个 线程。在 Akka HTTP 路由定义中,我启动了一个 long-运行 进程,我自然希望在不阻塞用户的情况下执行此操作。我可以使用以下代码片段实现此目的:
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
complete {
Try(new URL(url)) match {
case scala.util.Success(u) => {
val src = Source.fromIterator(() => parseMovies(u).iterator)
src
.via(findMovieByTitleAndYear)
.via(persistMovies)
.toMat(Sink.fold(Future(0))((acc, elem) => Applicative[Future].map2(acc, elem)(_ + _)))(Keep.right)
// run the whole graph on a separate dispatcher
.withAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher"))
.run.flatten
.onComplete {
_ match {
case scala.util.Success(n) => logger.info(s"Created $n movies")
case Failure(t) => logger.error(t, "Failed to process movies")
}
}
Accepted
}
case Failure(t) => logger.error(t, "Bad URL"); BadRequest -> "Bad URL"
}
}
如果我已经解决了,那还有什么问题呢?问题是我不确定如何设置超时。图的执行创建了一个 Future
,该 Future
在专用的 blocking-io-dispatcher
上执行直到完成。如果我添加一个 Await
调用,代码就会阻塞。有没有办法设置超时?
completionTimeout
stage 应该在这里有所帮助。示例如下:
src
.completionTimeout(5.seconds)
...
.run.flatten
.onComplete {
case scala.util.Success(n) => logger.info(s"Created $n movies")
case Failure(t: TimeoutException) => logger.error(t, "Timed out")
case Failure(t) => logger.error(t, "Failed to process movies")
}
文档参考 here.
这个问题是基于我做的一个宠物项目和这个
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
complete {
Try(new URL(url)) match {
case scala.util.Success(u) => {
val src = Source.fromIterator(() => parseMovies(u).iterator)
src
.via(findMovieByTitleAndYear)
.via(persistMovies)
.toMat(Sink.fold(Future(0))((acc, elem) => Applicative[Future].map2(acc, elem)(_ + _)))(Keep.right)
// run the whole graph on a separate dispatcher
.withAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher"))
.run.flatten
.onComplete {
_ match {
case scala.util.Success(n) => logger.info(s"Created $n movies")
case Failure(t) => logger.error(t, "Failed to process movies")
}
}
Accepted
}
case Failure(t) => logger.error(t, "Bad URL"); BadRequest -> "Bad URL"
}
}
如果我已经解决了,那还有什么问题呢?问题是我不确定如何设置超时。图的执行创建了一个 Future
,该 Future
在专用的 blocking-io-dispatcher
上执行直到完成。如果我添加一个 Await
调用,代码就会阻塞。有没有办法设置超时?
completionTimeout
stage 应该在这里有所帮助。示例如下:
src
.completionTimeout(5.seconds)
...
.run.flatten
.onComplete {
case scala.util.Success(n) => logger.info(s"Created $n movies")
case Failure(t: TimeoutException) => logger.error(t, "Timed out")
case Failure(t) => logger.error(t, "Failed to process movies")
}
文档参考 here.