识别 Akka HttpRequest 和 HttpResponse?
Identify Akka HttpRequest and HttpResponse?
在使用 Akka HttpRequest 并将请求通过管道传递给参与者时,我无法识别响应。
参与者将处理每条将收到的消息,但它不知道哪个请求用于获取此响应。有什么方法可以识别每个请求以匹配响应?
注意:我没有服务器可以再次重新发送请求正文的任何部分。
提前致谢
MySelf.scala
import akka.actor.{ Actor, ActorLogging }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString
class Myself extends Actor with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
final implicit val materializer: ActorMaterializer =
ActorMaterializer(ActorMaterializerSettings(context.system))
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
log.info("Got response, body: " + body.utf8String)
}
case resp @ HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
resp.discardEntityBytes()
}
}
Main.scala
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
object HttpServerMain extends App {
import akka.pattern.pipe
// import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val http = Http(system)
val myActor = system.actorOf(Props[MySelf])
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.pipeTo(myActor)
http.singleRequest(HttpRequest(uri = "http://akka.io/another-request"))
.pipeTo(myActor)
Thread.sleep(2000)
system.terminate()
您可以简单地使用 map
来转换 Future
并在将其通过管道传输到 myActor
之前向其添加某种 ID(为此通常称为相关 ID):
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.map(x => (1, x)).pipeTo(myActor)
您需要更改模式匹配块以获取元组:
case (id, HttpResponse(StatusCodes.OK, headers, entity, _)) =>
如果您不能t/don出于某种原因不想更改您的模式匹配块,您可以使用相同的方法,而是将唯一的 HTTP header 添加到您完成的请求中(使用 copy
) 用这样的东西(如果编译不检查):
// make a unique header name that you are sure will not be
// received from http response:
val correlationHeader: HttpHeader = ... // mycustomheader
// Basically hack the response to add your header:
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.map(x => x.copy(headers = correlationHeader +: headers)).pipeTo(myActor)
// Now you can check your header to see which response that was:
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
headers.find(_.is("mycustomheader")).map(_.value).getOrElse("NA")
与之前的选项相比,这更像是一种黑客攻击,因为您正在修改响应。
我认为你不能直接使用 pipeTo
来做到这一点,因为它本质上只是 andThen
调用你的 Future
。一种选择是 map
然后发送一个 (request, response)
元组给演员:
val request = HttpRequest(uri = "http://akka.io")
http.singleRequest(request).map {
response => myActor ! (request, response)
}
class Myself extends Actor with ActorLogging {
...
def receive = {
case (request, HttpResponse(StatusCodes.OK, headers, entity, _)) =>
...
case (request, resp @ HttpResponse(code, _, _, _)) =>
log.info(request.toString)
...
}
}
在使用 Akka HttpRequest 并将请求通过管道传递给参与者时,我无法识别响应。 参与者将处理每条将收到的消息,但它不知道哪个请求用于获取此响应。有什么方法可以识别每个请求以匹配响应?
注意:我没有服务器可以再次重新发送请求正文的任何部分。
提前致谢
MySelf.scala
import akka.actor.{ Actor, ActorLogging }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString
class Myself extends Actor with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
final implicit val materializer: ActorMaterializer =
ActorMaterializer(ActorMaterializerSettings(context.system))
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
log.info("Got response, body: " + body.utf8String)
}
case resp @ HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
resp.discardEntityBytes()
}
}
Main.scala
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
object HttpServerMain extends App {
import akka.pattern.pipe
// import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val http = Http(system)
val myActor = system.actorOf(Props[MySelf])
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.pipeTo(myActor)
http.singleRequest(HttpRequest(uri = "http://akka.io/another-request"))
.pipeTo(myActor)
Thread.sleep(2000)
system.terminate()
您可以简单地使用 map
来转换 Future
并在将其通过管道传输到 myActor
之前向其添加某种 ID(为此通常称为相关 ID):
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.map(x => (1, x)).pipeTo(myActor)
您需要更改模式匹配块以获取元组:
case (id, HttpResponse(StatusCodes.OK, headers, entity, _)) =>
如果您不能t/don出于某种原因不想更改您的模式匹配块,您可以使用相同的方法,而是将唯一的 HTTP header 添加到您完成的请求中(使用 copy
) 用这样的东西(如果编译不检查):
// make a unique header name that you are sure will not be
// received from http response:
val correlationHeader: HttpHeader = ... // mycustomheader
// Basically hack the response to add your header:
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.map(x => x.copy(headers = correlationHeader +: headers)).pipeTo(myActor)
// Now you can check your header to see which response that was:
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
headers.find(_.is("mycustomheader")).map(_.value).getOrElse("NA")
与之前的选项相比,这更像是一种黑客攻击,因为您正在修改响应。
我认为你不能直接使用 pipeTo
来做到这一点,因为它本质上只是 andThen
调用你的 Future
。一种选择是 map
然后发送一个 (request, response)
元组给演员:
val request = HttpRequest(uri = "http://akka.io")
http.singleRequest(request).map {
response => myActor ! (request, response)
}
class Myself extends Actor with ActorLogging {
...
def receive = {
case (request, HttpResponse(StatusCodes.OK, headers, entity, _)) =>
...
case (request, resp @ HttpResponse(code, _, _, _)) =>
log.info(request.toString)
...
}
}