Akka Streams + Akka Http 通过流传递参数
Akka Streams + Akka Http Pass parameter through the flow
我有以下代码片段:
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????) )
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
在代码的某处,我正在向 Flow 发送消息:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
我的问题是方法解析应该使用原始案例中的 param2 class
所以第一条消息应该是
parse(response,"b")
第二条消息应该是
parse(response,"b1")
那么问题来了,如何从提交到流程的方法中获取参数?
假设您的 connection
值是通过
实例化的
val connection = Http().cachedHostConnectionPool(...)
您可以利用 Connection 接受一个元组这一事实,而不是简单地在元组中传递两次 request
,您可以传递输入的 SomeClass
。此 SomeClass
实例必须通过您的每个 Flow
值才能进入解析阶段。
稍微修改一下代码:
val getDocumentFlow =
Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))
你的问题没有说明 getDocumentById
中的 return 类型,所以我只是使用 Document
:
val documentToRequest =
Flow[(Document, SomeClass)] map { case (document, someClass) =>
val request = ...
(request, someClass)
}
val parseResponse =
Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)), someClass) =>
entity
.dataBytes
.runFold(ByteString(""))(_ ++ _)
.map(e => e -> someClass)
}
val parseEntity = Flow[(ByteString, SomeClass)] map {
case (entity, someClass) => parse(entity.utf8String, someClass)
}
然后可以按照问题中的描述使用这些流程:
val someFlow =
someClassActorSource
.via(getDocumentFlow)
.via(documentToRequest)
.via(connection)
.via(parseResponse)
.via(parseEntity)
.to(Sink.someSink{...})
.run()
我有以下代码片段:
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????) )
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
在代码的某处,我正在向 Flow 发送消息:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
我的问题是方法解析应该使用原始案例中的 param2 class
所以第一条消息应该是
parse(response,"b")
第二条消息应该是
parse(response,"b1")
那么问题来了,如何从提交到流程的方法中获取参数?
假设您的 connection
值是通过
val connection = Http().cachedHostConnectionPool(...)
您可以利用 Connection 接受一个元组这一事实,而不是简单地在元组中传递两次 request
,您可以传递输入的 SomeClass
。此 SomeClass
实例必须通过您的每个 Flow
值才能进入解析阶段。
稍微修改一下代码:
val getDocumentFlow =
Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))
你的问题没有说明 getDocumentById
中的 return 类型,所以我只是使用 Document
:
val documentToRequest =
Flow[(Document, SomeClass)] map { case (document, someClass) =>
val request = ...
(request, someClass)
}
val parseResponse =
Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)), someClass) =>
entity
.dataBytes
.runFold(ByteString(""))(_ ++ _)
.map(e => e -> someClass)
}
val parseEntity = Flow[(ByteString, SomeClass)] map {
case (entity, someClass) => parse(entity.utf8String, someClass)
}
然后可以按照问题中的描述使用这些流程:
val someFlow =
someClassActorSource
.via(getDocumentFlow)
.via(documentToRequest)
.via(connection)
.via(parseResponse)
.via(parseEntity)
.to(Sink.someSink{...})
.run()