如何在流中捕获未来失败的异常
How to catch future failed exception in stream
我正在尝试创建一个流来轮询休息服务并解组 json 对象。
我创建了一个 source.tick,它每 5 秒执行一次 http 请求。如果成功,HttpResponse 将包含一个 OK。如果不是,则服务不可用。结果将发送给演员。见以下代码:
def poll(pollActor: ActorRef) {
val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[Item]
case resp @ HttpResponse(code, _, _, _) =>
log.warning("Request failed, response code: " + code)
Future.failed(new Exception)
}
source.via(flow).runWith(Sink.actorRef[Equals](pollActor,akka.actor.Status.Success(())))
}
演员将从流中接收结果,如以下代码所示:
def receive = {
case k : Item => println(k)
case f : Failure => {
println("We failed: " + f)
}
}
未来抛出的异常应该在哪里处理,如何处理?
解决这个问题的一种方法是让失败成为您的流的明确部分。
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[Item].map(Right(_))
case resp @ HttpResponse(code, _, _, _) =>
Future.successful(Left(MyDomainFailure("some meaningful message/data")))
}
请注意,现在您的流程类型是
Flow[HttpRequest, Either[MyDomainFailure, Item], Future[OutgoingConnection]]
这具有明确的附加价值,使下游阶段意识到故障并迫使他们处理它(好吧,在这种情况下不是真的,因为你正在使用演员。如果你留在流,你将被迫处理它们)。
def receive = {
case Right(item) => println(item)
case Left(failure) => {
println("We failed: " + failure.msg)
}
}
这是我使用的修复程序,虽然它不会产生异常,但 HttpResponse
中包含的 Failure
在接收函数中只是匹配。
def poll(pollActor: ActorRef) {
val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
// Where able to reach the API.
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Unmarshal the json response.
Unmarshal(entity).to[Item]
// Failed to reach the API.
case HttpResponse(code, _, _, _) =>
Future.successful(code)
}
source.via(flow).runWith(Sink.actorRef[Any](pollActor,akka.actor.Status.Success(())))
}
这里我们匹配HttpResponse
产生的Failure
。
def receive = {
case item: Item => println(item)
case failure: Failure => {
log.warning("Request failed, response code: " + failure)
}
}
我正在尝试创建一个流来轮询休息服务并解组 json 对象。
我创建了一个 source.tick,它每 5 秒执行一次 http 请求。如果成功,HttpResponse 将包含一个 OK。如果不是,则服务不可用。结果将发送给演员。见以下代码:
def poll(pollActor: ActorRef) {
val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[Item]
case resp @ HttpResponse(code, _, _, _) =>
log.warning("Request failed, response code: " + code)
Future.failed(new Exception)
}
source.via(flow).runWith(Sink.actorRef[Equals](pollActor,akka.actor.Status.Success(())))
}
演员将从流中接收结果,如以下代码所示:
def receive = {
case k : Item => println(k)
case f : Failure => {
println("We failed: " + f)
}
}
未来抛出的异常应该在哪里处理,如何处理?
解决这个问题的一种方法是让失败成为您的流的明确部分。
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[Item].map(Right(_))
case resp @ HttpResponse(code, _, _, _) =>
Future.successful(Left(MyDomainFailure("some meaningful message/data")))
}
请注意,现在您的流程类型是
Flow[HttpRequest, Either[MyDomainFailure, Item], Future[OutgoingConnection]]
这具有明确的附加价值,使下游阶段意识到故障并迫使他们处理它(好吧,在这种情况下不是真的,因为你正在使用演员。如果你留在流,你将被迫处理它们)。
def receive = {
case Right(item) => println(item)
case Left(failure) => {
println("We failed: " + failure.msg)
}
}
这是我使用的修复程序,虽然它不会产生异常,但 HttpResponse
中包含的 Failure
在接收函数中只是匹配。
def poll(pollActor: ActorRef) {
val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
// Where able to reach the API.
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Unmarshal the json response.
Unmarshal(entity).to[Item]
// Failed to reach the API.
case HttpResponse(code, _, _, _) =>
Future.successful(code)
}
source.via(flow).runWith(Sink.actorRef[Any](pollActor,akka.actor.Status.Success(())))
}
这里我们匹配HttpResponse
产生的Failure
。
def receive = {
case item: Item => println(item)
case failure: Failure => {
log.warning("Request failed, response code: " + failure)
}
}