使用 Akka-Http 流进行轮询
Polling with Akka-Http stream
我找到了一个 [example][1],其中 akka-http 与 Source.single 一起使用来发出请求。现在我想使用 Source.tick 来实现每 X 秒执行一次的轮询请求,如下所示:
import akka.http.scaladsl.model._
import scala.concurrent.duration._
val request: HttpRequest = RequestBuilding.Get(Uri("http://api.someSite.com"))
val source: Source[HttpRequest, Cancellable] = Source.tick(1.seconds, 1.seconds, request)
val sourceWithDest = source.via(Http().superPool())
但是,我在最后一行遇到了一个我无法解决的编译错误(类型不匹配)。关于我做错了什么的任何想法或替代方案的建议?
[1]: https://gist.github.com/steinybot/a1f79fe9a67693722164
根据 docs:
The Flow returned by Http().superPool(...) is very similar to the one
from the Host-Level Client-Side API, so the Using a Host Connection
Pool section also applies here.
和then
The “pool client flow” returned by
Http().cachedHostConnectionPool(...) has the following type:
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
这是为了让客户端代码有可能实现一些逻辑来匹配原始请求和相应的响应。假设您在您的案例中不需要这种行为,您始终可以在将请求提供给池流之前将 NotUsed
附加到您的请求中。例如
val sourceWithDest: Source[Try[HttpResponse], Cancellable] =
source.map(req ⇒ (req, NotUsed)).via(Http().superPool[NotUsed]()).map(_._1)
我找到了一个 [example][1],其中 akka-http 与 Source.single 一起使用来发出请求。现在我想使用 Source.tick 来实现每 X 秒执行一次的轮询请求,如下所示:
import akka.http.scaladsl.model._
import scala.concurrent.duration._
val request: HttpRequest = RequestBuilding.Get(Uri("http://api.someSite.com"))
val source: Source[HttpRequest, Cancellable] = Source.tick(1.seconds, 1.seconds, request)
val sourceWithDest = source.via(Http().superPool())
但是,我在最后一行遇到了一个我无法解决的编译错误(类型不匹配)。关于我做错了什么的任何想法或替代方案的建议? [1]: https://gist.github.com/steinybot/a1f79fe9a67693722164
根据 docs:
The Flow returned by Http().superPool(...) is very similar to the one from the Host-Level Client-Side API, so the Using a Host Connection Pool section also applies here.
和then
The “pool client flow” returned by Http().cachedHostConnectionPool(...) has the following type:
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
这是为了让客户端代码有可能实现一些逻辑来匹配原始请求和相应的响应。假设您在您的案例中不需要这种行为,您始终可以在将请求提供给池流之前将 NotUsed
附加到您的请求中。例如
val sourceWithDest: Source[Try[HttpResponse], Cancellable] =
source.map(req ⇒ (req, NotUsed)).via(Http().superPool[NotUsed]()).map(_._1)