Source.tick 的 HTTP 客户端

Http Client with Source.tick

我正在尝试将 http 客户端连接到服务器公开的 http 服务,源应该每 1 秒发送一次请求,因为我已经创建了以下部分图表:

def httpSourceGraph() = {
  Source.fromGraph(GraphDSL.create() { implicit builder =>
    val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(1,
      TimeUnit.SECONDS),
      HttpRequest(uri ="/test", method = HttpMethods.GET))).out
    // expose outlet
    SourceShape(sourceOutLet)
  })
}

def httpConnFlow() = {
  Flow.fromGraph(GraphDSL.create() { implicit builder =>

    val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = "localhost", port = 8080))

    FlowShape(httpSourceFlow.in, httpSourceFlow.out)
  })
}

图表组成为

val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)

如果 http 服务器 (localhost:8080/test) 已启动并且 运行,一切正常,每隔 1 秒我就可以看到服务器返回的响应。如果任一服务器关闭或稍后关闭,我将无法做出任何响应。

我认为它应该给我以下错误:

akka.stream.StreamTcpException: Tcp 命令 [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 秒),true)] 失败

这也可以用一些错误来测试url。 (域名Whosebug1.com 错误url "/test")

感谢您的帮助。

-阿伦

我可以提出一种方法来获得您正在寻找的行为。我认为您问题的核心是 Http().outgoingConnection 生成的 Flow 将在遇到故障时终止。一旦发生这种情况,就不再有下游需求从 Source 拉取请求,整个流程停止。如果您想要无论连接是否丢失都将继续向下游发出元素的东西,那么您可以尝试使用主机连接池而不是单个连接。该池将对单个连接的故障更有弹性,并且它也从一开始就设置为向下游发送 SuccessFailure。使用主机连接池的流程的简化版本可以定义如下:

 val source = 
   Source.tick(
     1 second, 
     5 second, 
     (HttpRequest(uri ="/", method = HttpMethods.GET), 1)
   )

 val connFlow = Http(system).
   newHostConnectionPool[Int](host = "www.aquto.com", port = 80)

 val sink = Sink.foreach[(util.Try[HttpResponse], Int)]{
   case (util.Success(r), _ ) => 
     r.entity.toStrict(10 seconds)
     println(s"Success: ${r.status}")

   case (util.Failure(ex), _) => 
     println(s"Failure: ${ex.getMessage}")
 }

source.via(connFlow).to(sink).run

我对此进行了测试,在测试过程中拔掉了我的网络连接,这就是我看到的输出:

Success: 200 OK
Success: 200 OK
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Success: 200 OK
Success: 200 OK