Scala 和 akka-http 休息服务的断路器
Circuit breaker for Scala and akka-http rest service
我目前正在使用 Akka-HTTP 实现断路器,如下所示:
def sendMail(entity: MyEntity): ToResponseMarshallable = {
Thread.sleep(5 * 1000)
validateEntity(entity).map[ToResponseMarshallable] {
case (body, subject) if !isEmpty(body, subject) => {
val mailResponse = sendMail(body, subject)
OK -> ProcessedEmailMessage(mailResponse)
}
case _ =>
BadRequest -> s"error: for $entity".toJson
}
} catch {
case e: DeserializationException => HttpResponse(BadRequest).withEntity(HttpEntity(s"error:${e.msg}").withContentType(ContentTypes.`application/json`))
}
}
val maxFailures: Int = 2
val callTimeout: FiniteDuration = 1 second
val resetTimeout: FiniteDuration = 30 seconds
def open: Unit = {
logger.info("Circuit Breaker is open")
}
def close: Unit = {
logger.info("Circuit Breaker is closed")
}
def halfopen: Unit = {
logger.info("Circuit Breaker is half-open, next message goes through")
private lazy val breaker = CircuitBreaker(
system.scheduler,
maxFailures,
callTimeout,
resetTimeout
).onOpen(open).onClose(close).onHalfOpen(halfopen)
def routes: Route = {
logRequestResult("email-service_aggregator_email") {
pathPrefix("v1") {
path("sendmail") {
post {
entity(as[EmailMessage]) { entity =>
complete {
breaker.withCircuitBreaker(Future(sendMail(entity)))
}
}
}
}
}
}
}
我的问题是,如果我使用 breaker.withCircuitBreaker(Future(sendMail(entity)))
,断路器会进入打开状态,但其余响应 returns There was an internal server error
作为响应
如果我改为使用 breaker.withSyncCircuitBreaker(Future(sendMail(entity)))
,则断路器永远不会进入打开状态,但它 return 是预期的 HttpResponse
关于如何解决这个问题以触发断路器和 return 正确的 HTTP 响应有什么想法吗?
entity(as[EmailMessage]) { entity => ctx =>
val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity)))
val withErrorHandling = withBreaker.recover {
case _: CircuitBreakerOpenException =>
HttpResponse(TooManyRequests).withEntity("Server Busy")
}
ctx.complete(withErrorHandling)
}
我将提供另一种可能的解决方案,因为我相信 onComplete
是完成路线时处理 Future
结果的惯用方式:
entity(as[EmailMessage]) { entity =>
val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity)))
onComplete(withBreaker){
case Success(trm) =>
complete(trm)
//Circuit breaker opened handling
case Failure(ex:CircuitBreakerOpenException) =>
complete(HttpResponse(TooManyRequests).withEntity("Server Busy"))
//General exception handling
case Failure(ex) =>
complete(InternalServerError)
}
}
我目前正在使用 Akka-HTTP 实现断路器,如下所示:
def sendMail(entity: MyEntity): ToResponseMarshallable = {
Thread.sleep(5 * 1000)
validateEntity(entity).map[ToResponseMarshallable] {
case (body, subject) if !isEmpty(body, subject) => {
val mailResponse = sendMail(body, subject)
OK -> ProcessedEmailMessage(mailResponse)
}
case _ =>
BadRequest -> s"error: for $entity".toJson
}
} catch {
case e: DeserializationException => HttpResponse(BadRequest).withEntity(HttpEntity(s"error:${e.msg}").withContentType(ContentTypes.`application/json`))
}
}
val maxFailures: Int = 2
val callTimeout: FiniteDuration = 1 second
val resetTimeout: FiniteDuration = 30 seconds
def open: Unit = {
logger.info("Circuit Breaker is open")
}
def close: Unit = {
logger.info("Circuit Breaker is closed")
}
def halfopen: Unit = {
logger.info("Circuit Breaker is half-open, next message goes through")
private lazy val breaker = CircuitBreaker(
system.scheduler,
maxFailures,
callTimeout,
resetTimeout
).onOpen(open).onClose(close).onHalfOpen(halfopen)
def routes: Route = {
logRequestResult("email-service_aggregator_email") {
pathPrefix("v1") {
path("sendmail") {
post {
entity(as[EmailMessage]) { entity =>
complete {
breaker.withCircuitBreaker(Future(sendMail(entity)))
}
}
}
}
}
}
}
我的问题是,如果我使用 breaker.withCircuitBreaker(Future(sendMail(entity)))
,断路器会进入打开状态,但其余响应 returns There was an internal server error
作为响应
如果我改为使用 breaker.withSyncCircuitBreaker(Future(sendMail(entity)))
,则断路器永远不会进入打开状态,但它 return 是预期的 HttpResponse
关于如何解决这个问题以触发断路器和 return 正确的 HTTP 响应有什么想法吗?
entity(as[EmailMessage]) { entity => ctx =>
val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity)))
val withErrorHandling = withBreaker.recover {
case _: CircuitBreakerOpenException =>
HttpResponse(TooManyRequests).withEntity("Server Busy")
}
ctx.complete(withErrorHandling)
}
我将提供另一种可能的解决方案,因为我相信 onComplete
是完成路线时处理 Future
结果的惯用方式:
entity(as[EmailMessage]) { entity =>
val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity)))
onComplete(withBreaker){
case Success(trm) =>
complete(trm)
//Circuit breaker opened handling
case Failure(ex:CircuitBreakerOpenException) =>
complete(HttpResponse(TooManyRequests).withEntity("Server Busy"))
//General exception handling
case Failure(ex) =>
complete(InternalServerError)
}
}