多个http请求的Akka流

Akka flow for multiple http requests

在我的一个项目中,我有一个 akka actor 用于向我的 google fcm 服务器发送 post 请求。 actor 获取一个 id 列表,并且应该发出与列表中一样多的请求。我在 runForeach(println(_)) 中打印出来自服务器的响应,但我只得到一个完整 ID 列表的打印输出。为什么会这样?

class FCMActor(val key: String) extends Actor{
  import fcm.FCMActor._
  import akka.pattern.pipe
  import context.dispatcher

  private implicit def system: ActorSystem = ActorSystem()
  final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

  def buildBody(id: Option[String]): String = {
    Json.obj(
      "to" -> id,
      "priority" -> "high",
      "data" -> Json.obj("message" -> "Firebase Clud Message"),
      "time_to_live" -> 60
    ).toString()
  }

  def buildHttpRequest(body: String): HttpRequest = {
    HttpRequest(method = HttpMethods.POST,
      uri = s"/fcm/send",
      entity = HttpEntity(MediaTypes.`application/json`, body),
      headers = List(RawHeader("Authorization", s"key=$key")))
  }

  val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = {
    Http().outgoingConnection("fcm.googleapis.com")
  }

  def send(ids: List[Option[String]]) = {

    val httpRequests: List[HttpRequest] = ids.map(buildBody).map(buildHttpRequest)
    println(httpRequests)

    Source(httpRequests).via(connectionFlow).runForeach(println(_)) // << here I only get one println
  }

  override def receive: Receive = {
    case SendToIds(ids: List[Option[String]]) =>
      send(ids)

  }
}

您没有使用服务器发送给您的响应实体。要了解为什么这很重要,请查看相关的 docs page.

尝试修复此问题的快速代码更改是:

... .runForeach{ response =>
  response.discardEntityBytes()
  println(response)
}

或者,如果您真的对该实体感兴趣,可以参考

... .runForeach{ _.entity.dataBytes
  .runFold(ByteString.empty) { case (acc, b) => acc ++ b }
  .map(println(_))
}