Akka Streams + Akka Http 通过流传递参数

Akka Streams + Akka Http Pass parameter through the flow

我有以下代码片段:

    case class SomeClass(param1:String,param2:String,param3:String)

    val someClassActorSource: Source[SomeClass, ActorRef] = Source
      .actorPublisher[SomeClass](Props[SomeClassActorPublisher])

    val someFlow: ActorRef = Flow[SomeClass]

        .mapAsync(3)(f=> getDocumentById(f))

        .map(f =>{
          val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
            .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
            )
          (request,request)

        }).via(connection)

        //Parsing Response
        .mapAsync(3){
          case (Success(HttpResponse(status, _, entity, _)),request)=>
            entity.dataBytes.runFold(ByteString(""))(_ ++ _)
        }
        .map(resp =>parse(resp.utf8String,?????????????) )
        .to(Sink.someSink{....})
        .runWith(someClassActorSource)

    def parse(resp:String,parseParam:String)=????

在代码的某处,我正在向 Flow 发送消息:

someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")

我的问题是方法解析应该使用原始案例中的 param2 class

所以第一条消息应该是

  parse(response,"b")

第二条消息应该是

  parse(response,"b1")

那么问题来了,如何从提交到流程的方法中获取参数?

假设您的 connection 值是通过

实例化的
val connection = Http().cachedHostConnectionPool(...)

您可以利用 Connection 接受一个元组这一事实,而不是简单地在元组中传递两次 request,您可以传递输入的 SomeClass。此 SomeClass 实例必须通过您的每个 Flow 值才能进入解析阶段。

稍微修改一下代码:

val getDocumentFlow = 
  Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))

你的问题没有说明 getDocumentById 中的 return 类型,所以我只是使用 Document:

val documentToRequest = 
  Flow[(Document, SomeClass)] map { case (document, someClass) =>
    val request = ...

    (request, someClass)
  }

val parseResponse = 
  Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
    case (Success(HttpResponse(status, _, entity, _)), someClass) =>
      entity
        .dataBytes
        .runFold(ByteString(""))(_ ++ _)
        .map(e => e -> someClass)
  }

val parseEntity = Flow[(ByteString, SomeClass)] map { 
  case (entity, someClass) => parse(entity.utf8String, someClass)
}

然后可以按照问题中的描述使用这些流程:

val someFlow = 
  someClassActorSource
    .via(getDocumentFlow)
    .via(documentToRequest)
    .via(connection)
    .via(parseResponse)
    .via(parseEntity)
    .to(Sink.someSink{...})
    .run()