Akka 流 Source.repeat 在 100 个请求后停止
Akka streams Source.repeat stops after 100 requests
我正在研究以下流处理系统,以从一个来源抓取帧、处理并发送到另一个来源。我通过他们的 scapa api 使用 akka-streams
和 akka-http
的组合。管道非常短,但我似乎无法找到系统在向端点发出恰好 100 次请求后决定停止的位置。
object frameProcessor extends App {
implicit val system: ActorSystem = ActorSystem("VideoStreamProcessor")
val decider: Supervision.Decider = _ => Supervision.Restart
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
val http = Http(system)
val sourceConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = http.outgoingConnection(sourceUri)
val byteFlow: Flow[HttpResponse, Future[ByteString], NotUsed] =
Flow[HttpResponse].map(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _))
Source.repeat(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.map(postFrame)
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
def postFrame(imageBytes: Future[ByteString]): Unit = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
}
}
作为参考,我使用的是 akka-streams
版本 2.5.19 和 akka-http
版本 10.1.7.没有抛出错误,帧来源服务器上没有错误代码,程序以错误代码 0 退出。
我的application.conf
如下:
logging = "DEBUG"
始终处理 100 个单位。
谢谢!
编辑
像这样向流添加日志记录
.onComplete{
case Success(res) => {
system.log.info(res.toString)
system.terminate()
}
case Failure(res) => {
system.log.error(res.getMessage)
system.terminate()
}
}
收到连接重置异常,但这不一致。流以 Done
.
结束
编辑 2
使用 .mapAsync(1)(postFrame)
我在恰好 100 个请求后得到相同的 Success(Done)
。此外,当我检查 nginx 服务器 access.log
和 error.log
时,只有 200
响应。
我不得不将 postFrame
修改为 运行 mapAsync
def postFrame(imageBytes: Future[ByteString]): Future[Unit] = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
Future(Unit)
}
我相信我已经在 Akka docs using delayed restarts with a backoff operator 上找到了答案。我没有直接从不稳定的远程连接获取资源,而是使用 RestartSource.withBackoff
和 而不是 RestartSource.onFailureWithBackoff
。修改后的流看起来像;
val restartSource = RestartSource.withBackoff(
minBackoff = 100.milliseconds,
maxBackoff = 1.seconds,
randomFactor = 0.2
){ () =>
Source.single(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.mapAsync(1)(postFrame)
}
restartSource
.runWith(Sink.ignore)
.onComplete{
x => {
println(x)
system.terminate()
}
}
我无法找到问题的根源,但它似乎可以解决问题。
我正在研究以下流处理系统,以从一个来源抓取帧、处理并发送到另一个来源。我通过他们的 scapa api 使用 akka-streams
和 akka-http
的组合。管道非常短,但我似乎无法找到系统在向端点发出恰好 100 次请求后决定停止的位置。
object frameProcessor extends App {
implicit val system: ActorSystem = ActorSystem("VideoStreamProcessor")
val decider: Supervision.Decider = _ => Supervision.Restart
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
val http = Http(system)
val sourceConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = http.outgoingConnection(sourceUri)
val byteFlow: Flow[HttpResponse, Future[ByteString], NotUsed] =
Flow[HttpResponse].map(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _))
Source.repeat(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.map(postFrame)
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
def postFrame(imageBytes: Future[ByteString]): Unit = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
}
}
作为参考,我使用的是 akka-streams
版本 2.5.19 和 akka-http
版本 10.1.7.没有抛出错误,帧来源服务器上没有错误代码,程序以错误代码 0 退出。
我的application.conf
如下:
logging = "DEBUG"
始终处理 100 个单位。
谢谢!
编辑
像这样向流添加日志记录
.onComplete{
case Success(res) => {
system.log.info(res.toString)
system.terminate()
}
case Failure(res) => {
system.log.error(res.getMessage)
system.terminate()
}
}
收到连接重置异常,但这不一致。流以 Done
.
编辑 2
使用 .mapAsync(1)(postFrame)
我在恰好 100 个请求后得到相同的 Success(Done)
。此外,当我检查 nginx 服务器 access.log
和 error.log
时,只有 200
响应。
我不得不将 postFrame
修改为 运行 mapAsync
def postFrame(imageBytes: Future[ByteString]): Future[Unit] = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
Future(Unit)
}
我相信我已经在 Akka docs using delayed restarts with a backoff operator 上找到了答案。我没有直接从不稳定的远程连接获取资源,而是使用 RestartSource.withBackoff
和 而不是 RestartSource.onFailureWithBackoff
。修改后的流看起来像;
val restartSource = RestartSource.withBackoff(
minBackoff = 100.milliseconds,
maxBackoff = 1.seconds,
randomFactor = 0.2
){ () =>
Source.single(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.mapAsync(1)(postFrame)
}
restartSource
.runWith(Sink.ignore)
.onComplete{
x => {
println(x)
system.terminate()
}
}
我无法找到问题的根源,但它似乎可以解决问题。