如何在前一阶段的Akka Stream Flow中传递参数?

How to pass parameters in Akka Stream Flow from a previous stage?

我正在玩 Akka Streams,对通过带参数的函数构建 Flow 感到好奇。参数应该从前一个流阶段传递。

假设我们有以下输入和输出:

case class InitElement(v: Int)

trait StreamResult
case class StageA(v: Int) extends StreamResult
case class StageB(v: Int) extends StreamResult

trait StreamFailure extends StreamResult { val msg: String }
case class StageAFailure(msg: String) extends StreamFailure
case class StageBFailure(msg: String) extends StreamFailure

他们的直播阶段如下:

val stageA = Flow[InitElement].map {
    init => 
      if (init.v > 10) 
        StageA(init.v) 
      else 
        StageAFailure(s"${init.v} less than 10")
  }

  def stageB(input: StreamResult)(externalService: Set[Int]) = Flow[StreamResult].map {
    case failure: StreamFailure => failure
    case StageA(v) => 
      if (externalService.contains(v)) 
        StageB(v)
      else
        StageBFailure(s"$v is absent in external service")
  }

  val graph = Source.single(InitElement(12))
    .via(stageA)
//How can I pass output of previous stage into 'stageB' function?
    .via(stageB(_)(Set(11, 15, 20)))
    .toMat(Sink.head)(Keep.right)

我看不出有什么方法可以将 stageA 的结果传递给构造下一阶段的函数。

我该如何实施?

您对 stageB 的定义构建了一个 Flow[StreamResult, StreamResult, _]。它不需要将 (input: StreamResult) 作为参数。请注意,您不要在 stageB 定义中的任何地方使用 input。该元素来自 Flow[StreamResult],您将在其上 map

这应该足够了:

def stageB(externalService: Set[Int]) = … // As you wrote it

val graph = Source.single(InitElement(12))
  .via(stageA)
  .via(stageB(Set(11, 15, 20)))
  .toMat(Sink.head)(Keep.right)