Source.tick 的 HTTP 客户端
Http Client with Source.tick
我正在尝试将 http 客户端连接到服务器公开的 http 服务,源应该每 1 秒发送一次请求,因为我已经创建了以下部分图表:
def httpSourceGraph() = {
Source.fromGraph(GraphDSL.create() { implicit builder =>
val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(1,
TimeUnit.SECONDS),
HttpRequest(uri ="/test", method = HttpMethods.GET))).out
// expose outlet
SourceShape(sourceOutLet)
})
}
def httpConnFlow() = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = "localhost", port = 8080))
FlowShape(httpSourceFlow.in, httpSourceFlow.out)
})
}
图表组成为
val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)
如果 http 服务器 (localhost:8080/test) 已启动并且 运行,一切正常,每隔 1 秒我就可以看到服务器返回的响应。如果任一服务器关闭或稍后关闭,我将无法做出任何响应。
我认为它应该给我以下错误:
akka.stream.StreamTcpException: Tcp 命令 [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 秒),true)] 失败
这也可以用一些错误来测试url。 (域名Whosebug1.com 错误url "/test")
感谢您的帮助。
-阿伦
我可以提出一种方法来获得您正在寻找的行为。我认为您问题的核心是 Http().outgoingConnection
生成的 Flow
将在遇到故障时终止。一旦发生这种情况,就不再有下游需求从 Source
拉取请求,整个流程停止。如果您想要无论连接是否丢失都将继续向下游发出元素的东西,那么您可以尝试使用主机连接池而不是单个连接。该池将对单个连接的故障更有弹性,并且它也从一开始就设置为向下游发送 Success
或 Failure
。使用主机连接池的流程的简化版本可以定义如下:
val source =
Source.tick(
1 second,
5 second,
(HttpRequest(uri ="/", method = HttpMethods.GET), 1)
)
val connFlow = Http(system).
newHostConnectionPool[Int](host = "www.aquto.com", port = 80)
val sink = Sink.foreach[(util.Try[HttpResponse], Int)]{
case (util.Success(r), _ ) =>
r.entity.toStrict(10 seconds)
println(s"Success: ${r.status}")
case (util.Failure(ex), _) =>
println(s"Failure: ${ex.getMessage}")
}
source.via(connFlow).to(sink).run
我对此进行了测试,在测试过程中拔掉了我的网络连接,这就是我看到的输出:
Success: 200 OK
Success: 200 OK
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Success: 200 OK
Success: 200 OK
我正在尝试将 http 客户端连接到服务器公开的 http 服务,源应该每 1 秒发送一次请求,因为我已经创建了以下部分图表:
def httpSourceGraph() = {
Source.fromGraph(GraphDSL.create() { implicit builder =>
val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(1,
TimeUnit.SECONDS),
HttpRequest(uri ="/test", method = HttpMethods.GET))).out
// expose outlet
SourceShape(sourceOutLet)
})
}
def httpConnFlow() = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = "localhost", port = 8080))
FlowShape(httpSourceFlow.in, httpSourceFlow.out)
})
}
图表组成为
val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)
如果 http 服务器 (localhost:8080/test) 已启动并且 运行,一切正常,每隔 1 秒我就可以看到服务器返回的响应。如果任一服务器关闭或稍后关闭,我将无法做出任何响应。
我认为它应该给我以下错误:
akka.stream.StreamTcpException: Tcp 命令 [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 秒),true)] 失败
这也可以用一些错误来测试url。 (域名Whosebug1.com 错误url "/test")
感谢您的帮助。
-阿伦
我可以提出一种方法来获得您正在寻找的行为。我认为您问题的核心是 Http().outgoingConnection
生成的 Flow
将在遇到故障时终止。一旦发生这种情况,就不再有下游需求从 Source
拉取请求,整个流程停止。如果您想要无论连接是否丢失都将继续向下游发出元素的东西,那么您可以尝试使用主机连接池而不是单个连接。该池将对单个连接的故障更有弹性,并且它也从一开始就设置为向下游发送 Success
或 Failure
。使用主机连接池的流程的简化版本可以定义如下:
val source =
Source.tick(
1 second,
5 second,
(HttpRequest(uri ="/", method = HttpMethods.GET), 1)
)
val connFlow = Http(system).
newHostConnectionPool[Int](host = "www.aquto.com", port = 80)
val sink = Sink.foreach[(util.Try[HttpResponse], Int)]{
case (util.Success(r), _ ) =>
r.entity.toStrict(10 seconds)
println(s"Success: ${r.status}")
case (util.Failure(ex), _) =>
println(s"Failure: ${ex.getMessage}")
}
source.via(connFlow).to(sink).run
我对此进行了测试,在测试过程中拔掉了我的网络连接,这就是我看到的输出:
Success: 200 OK
Success: 200 OK
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
Success: 200 OK
Success: 200 OK