为什么 Source.tick 在一百个 Http 请求后停止?
Why does Source.tick stop after one hundred HttpRequests?
使用 akka 流和 akka HTTP,我创建了一个流,它每 3 秒轮询一次 api,将结果解组到 JsValue 对象并将该结果发送给参与者。从下面的代码可以看出:
// Source wich performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
3.seconds,
HttpRequest(uri = Uri(path = Path("/posts/1"))))
// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {
// Able to reach the API.
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Unmarshal the json response.
Unmarshal(entity).to[JsValue]
// Failed to reach the API.
case HttpResponse(code, _, entity, _) =>
entity.discardBytes()
Future.successful(code.toString())
}
// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))
这有效,但流会在 100 个 HttpRequest(滴答)后关闭。
这种行为的原因是什么?
绝对与 outgoingConnectionHttps
有关。这是一个低级别的 DSL,可能在某处存在一些配置错误的设置,这是导致这种情况的原因(尽管我不知道是哪一个)。
docs 实际上不鼓励使用此 DSL。
尝试使用更高级别的 DSL,例如缓存连接池
val flow = Http().cachedHostConnectionPoolHttps[NotUsed]("akka.io").mapAsync(1) {
// Able to reach the API.
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
// Unmarshal the json response.
Unmarshal(entity).to[String]
// Failed to reach the API.
case (Success(HttpResponse(code, _, entity, _)), _) =>
entity.discardBytes()
Future.successful(code.toString())
case (Failure(e), _) ⇒
throw e
}
// Run stream
source.map(_ → NotUsed).via(flow).runWith(...)
一个潜在的问题是 Sink.actorRef
没有背压信号,因此演员的邮箱可能已满。如果 actor 在收到 JsValue
对象时正在做一些可能需要很长时间的事情,请改用 Sink.actorRefWithAck
。例如:
val initMessage = "start"
val completeMessage = "done"
val ackMessage = "ack"
source
.via(flow)
.runWith(Sink.actorRefWithAck[Any](
processJsonActor, initMessage, ackMessage, completeMessage))
您需要更改 actor 以处理 initMessage
并使用 ackMessage
(使用 sender ! ackMessage
)回复每个流元素的流。有关 Sink.actorRefWithAck
的更多信息,请参见 here。
使用 akka 流和 akka HTTP,我创建了一个流,它每 3 秒轮询一次 api,将结果解组到 JsValue 对象并将该结果发送给参与者。从下面的代码可以看出:
// Source wich performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
3.seconds,
HttpRequest(uri = Uri(path = Path("/posts/1"))))
// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {
// Able to reach the API.
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Unmarshal the json response.
Unmarshal(entity).to[JsValue]
// Failed to reach the API.
case HttpResponse(code, _, entity, _) =>
entity.discardBytes()
Future.successful(code.toString())
}
// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))
这有效,但流会在 100 个 HttpRequest(滴答)后关闭。
这种行为的原因是什么?
绝对与 outgoingConnectionHttps
有关。这是一个低级别的 DSL,可能在某处存在一些配置错误的设置,这是导致这种情况的原因(尽管我不知道是哪一个)。
docs 实际上不鼓励使用此 DSL。
尝试使用更高级别的 DSL,例如缓存连接池
val flow = Http().cachedHostConnectionPoolHttps[NotUsed]("akka.io").mapAsync(1) {
// Able to reach the API.
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
// Unmarshal the json response.
Unmarshal(entity).to[String]
// Failed to reach the API.
case (Success(HttpResponse(code, _, entity, _)), _) =>
entity.discardBytes()
Future.successful(code.toString())
case (Failure(e), _) ⇒
throw e
}
// Run stream
source.map(_ → NotUsed).via(flow).runWith(...)
一个潜在的问题是 Sink.actorRef
没有背压信号,因此演员的邮箱可能已满。如果 actor 在收到 JsValue
对象时正在做一些可能需要很长时间的事情,请改用 Sink.actorRefWithAck
。例如:
val initMessage = "start"
val completeMessage = "done"
val ackMessage = "ack"
source
.via(flow)
.runWith(Sink.actorRefWithAck[Any](
processJsonActor, initMessage, ackMessage, completeMessage))
您需要更改 actor 以处理 initMessage
并使用 ackMessage
(使用 sender ! ackMessage
)回复每个流元素的流。有关 Sink.actorRefWithAck
的更多信息,请参见 here。