Akka Stream 在 Flow 中使用 HttpResponse
Akka Stream use HttpResponse in Flow
我想利用一个简单的流程从 http 服务收集一些额外的数据,并用结果增强我的数据对象。下面说明了这个想法:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
我在理解机制和两者之间的区别时遇到问题
流动的本质和物化/流动中的未来。
以下想法没有向我解释:
如何将响应中的值获取到新用户对象中,
所以我可以通过以下步骤处理该对象。
感谢您的帮助。
更新:
我正在使用远程 akka http 服务器评估代码,使用下面的代码进行解析,该服务器在立即和 10 秒之间响应请求。
这导致一些 "EnhancedUser" 个实例出现在最后,但是那些回答时间太长的实例丢失了它们的值。
我在某个时候将 .async 添加到 cityResponse 解析器的末尾,结果输出花费了更长的时间,但是是正确的。
该行为的原因是什么?它如何与已接受的答案相吻合?
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), member) => member
case (Success(response), member) => {
Unmarshal(response.entity).to[String] onComplete {
case Success(s) => member.city = Some(s)
case Failure(ex) => member.city = None
}
}
member
}.async // <<-- This changed the behavior to be correct, why?
根据您从 "cityRequestEndpoint" 获得的实体的性质,您可以使用两种不同的策略:
基于流
处理这种情况的典型方法是始终假设来自源端点的实体可以包含 N 条数据,其中 N 是事先不知道的。这通常是要遵循的模式,因为它是最通用的,因此在现实世界中 "safest"。
第一步是将来自端点的 HttpResponse
转换为数据源:
val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] =
(response, user) => response match {
case Failure(_) => Source single (None -> user)
case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
}
上面的代码是我们不假设 N 的大小的地方,r.entity.dataBytes
可能是 0 ByteString
值的源,或者可能是无限数值。但我们的逻辑不在乎!
现在我们需要合并来自源的数据。这是 Flow.flatMapConcat 的一个很好的用例,它采用源流并将其转换为值流(类似于 Iterables 的 flatMap):
val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] =
Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
剩下要做的就是将 (ByteString, User)
的元组转换为 EnhancedUser
。注意:我在下面假设 User
是 EnhancedUser
的子类,这是从问题逻辑中推断出来的:
val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser =
(byteStr, user) =>
byteStr
.map(s => EnhancedUser(user.data, s))
.getOrElse(user)
val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] =
Flow[(ByteString, User)] map convertByteStringToUser
现在可以组合这些组件:
val useEnhancementGraph =
userSource
.via(cityRequest)
.via(httpClient)
.via(cityByteStrFlow)
.via(cityUserFlow)
.via(processEnhancedUser)
.to(Sink foreach println)
基于未来
我们可以使用 Futures 来解决问题,类似于您在原始问题中引用的堆栈问题。我不推荐这种方法有两个原因:
- 它假定只有 1 个 ByteString 来自端点。如果端点将多个值作为 ByteString 发送,那么它们都会连接在一起,并且在创建
EnhancedUser
. 时可能会出错
- 它在 ByteString 数据的具体化上设置了人为超时,类似于
Async.await
(几乎应该始终避免)。
要使用基于 Future 的方法,对原始代码的唯一重大更改是使用 Flow.mapAsync
而不是 Flow.map
来处理 Future
正在创建的事实功能:
val parallelism = 10
val timeout : FiniteDuration = ??? //you need to specify the timeout limit
val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] =
_ match {
case (Failure(ex), user) =>
Future successful user
case (Success(resp), user) =>
resp
.entity
.toStrict(timeout)
.map(byteStr => new EnhancedUser(user.data, byteStr))
}
val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)
我想利用一个简单的流程从 http 服务收集一些额外的数据,并用结果增强我的数据对象。下面说明了这个想法:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
我在理解机制和两者之间的区别时遇到问题 流动的本质和物化/流动中的未来。
以下想法没有向我解释:
如何将响应中的值获取到新用户对象中, 所以我可以通过以下步骤处理该对象。
感谢您的帮助。
更新:
我正在使用远程 akka http 服务器评估代码,使用下面的代码进行解析,该服务器在立即和 10 秒之间响应请求。 这导致一些 "EnhancedUser" 个实例出现在最后,但是那些回答时间太长的实例丢失了它们的值。
我在某个时候将 .async 添加到 cityResponse 解析器的末尾,结果输出花费了更长的时间,但是是正确的。
该行为的原因是什么?它如何与已接受的答案相吻合?
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), member) => member
case (Success(response), member) => {
Unmarshal(response.entity).to[String] onComplete {
case Success(s) => member.city = Some(s)
case Failure(ex) => member.city = None
}
}
member
}.async // <<-- This changed the behavior to be correct, why?
根据您从 "cityRequestEndpoint" 获得的实体的性质,您可以使用两种不同的策略:
基于流
处理这种情况的典型方法是始终假设来自源端点的实体可以包含 N 条数据,其中 N 是事先不知道的。这通常是要遵循的模式,因为它是最通用的,因此在现实世界中 "safest"。
第一步是将来自端点的 HttpResponse
转换为数据源:
val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] =
(response, user) => response match {
case Failure(_) => Source single (None -> user)
case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
}
上面的代码是我们不假设 N 的大小的地方,r.entity.dataBytes
可能是 0 ByteString
值的源,或者可能是无限数值。但我们的逻辑不在乎!
现在我们需要合并来自源的数据。这是 Flow.flatMapConcat 的一个很好的用例,它采用源流并将其转换为值流(类似于 Iterables 的 flatMap):
val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] =
Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
剩下要做的就是将 (ByteString, User)
的元组转换为 EnhancedUser
。注意:我在下面假设 User
是 EnhancedUser
的子类,这是从问题逻辑中推断出来的:
val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser =
(byteStr, user) =>
byteStr
.map(s => EnhancedUser(user.data, s))
.getOrElse(user)
val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] =
Flow[(ByteString, User)] map convertByteStringToUser
现在可以组合这些组件:
val useEnhancementGraph =
userSource
.via(cityRequest)
.via(httpClient)
.via(cityByteStrFlow)
.via(cityUserFlow)
.via(processEnhancedUser)
.to(Sink foreach println)
基于未来
我们可以使用 Futures 来解决问题,类似于您在原始问题中引用的堆栈问题。我不推荐这种方法有两个原因:
- 它假定只有 1 个 ByteString 来自端点。如果端点将多个值作为 ByteString 发送,那么它们都会连接在一起,并且在创建
EnhancedUser
. 时可能会出错
- 它在 ByteString 数据的具体化上设置了人为超时,类似于
Async.await
(几乎应该始终避免)。
要使用基于 Future 的方法,对原始代码的唯一重大更改是使用 Flow.mapAsync
而不是 Flow.map
来处理 Future
正在创建的事实功能:
val parallelism = 10
val timeout : FiniteDuration = ??? //you need to specify the timeout limit
val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] =
_ match {
case (Failure(ex), user) =>
Future successful user
case (Success(resp), user) =>
resp
.entity
.toStrict(timeout)
.map(byteStr => new EnhancedUser(user.data, byteStr))
}
val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)