为什么 Source.tick 在一百个 Http 请求后停止?

Why does Source.tick stop after one hundred HttpRequests?

使用 akka 流和 akka HTTP,我创建了一个流,它每 3 秒轮询一次 api,将结果解组到 JsValue 对象并将该结果发送给参与者。从下面的代码可以看出:

// Source wich performs an http request every 3 seconds.
val source = Source.tick(0.seconds,
                         3.seconds,
                         HttpRequest(uri = Uri(path = Path("/posts/1"))))

// Processes the result of the http request
val flow = Http().outgoingConnectionHttps("jsonplaceholder.typicode.com").mapAsync(1) {

  // Able to reach the API.
  case HttpResponse(StatusCodes.OK, _, entity, _) =>
    // Unmarshal the json response.
    Unmarshal(entity).to[JsValue]

  // Failed to reach the API.
  case HttpResponse(code, _, entity, _) =>
    entity.discardBytes()
    Future.successful(code.toString())


}

// Run stream
source.via(flow).runWith(Sink.actorRef[Any](processJsonActor,akka.actor.Status.Success(("Completed stream"))))

这有效,但流会在 100 个 HttpRequest(滴答)后关闭。

这种行为的原因是什么?

绝对与 outgoingConnectionHttps 有关。这是一个低级别的 DSL,可能在某处存在一些配置错误的设置,这是导致这种情况的原因(尽管我不知道是哪一个)。

docs 实际上不鼓励使用此 DSL。

尝试使用更高级别的 DSL,例如缓存连接池

  val flow = Http().cachedHostConnectionPoolHttps[NotUsed]("akka.io").mapAsync(1) {

    // Able to reach the API.
    case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
      // Unmarshal the json response.
      Unmarshal(entity).to[String]

    // Failed to reach the API.
    case (Success(HttpResponse(code, _, entity, _)), _) =>
      entity.discardBytes()
      Future.successful(code.toString())

    case (Failure(e), _) ⇒
      throw e
  }

  // Run stream
  source.map(_ → NotUsed).via(flow).runWith(...)

一个潜在的问题是 Sink.actorRef 没有背压信号,因此演员的邮箱可能已满。如果 actor 在收到 JsValue 对象时正在做一些可能需要很长时间的事情,请改用 Sink.actorRefWithAck。例如:

val initMessage = "start"
val completeMessage = "done"
val ackMessage = "ack"

source
  .via(flow)
  .runWith(Sink.actorRefWithAck[Any](
    processJsonActor, initMessage, ackMessage, completeMessage))

您需要更改 actor 以处理 initMessage 并使用 ackMessage(使用 sender ! ackMessage)回复每个流元素的流。有关 Sink.actorRefWithAck 的更多信息,请参见 here