如何在前一阶段的Akka Stream Flow中传递参数?
How to pass parameters in Akka Stream Flow from a previous stage?
我正在玩 Akka Streams,对通过带参数的函数构建 Flow
感到好奇。参数应该从前一个流阶段传递。
假设我们有以下输入和输出:
case class InitElement(v: Int)
trait StreamResult
case class StageA(v: Int) extends StreamResult
case class StageB(v: Int) extends StreamResult
trait StreamFailure extends StreamResult { val msg: String }
case class StageAFailure(msg: String) extends StreamFailure
case class StageBFailure(msg: String) extends StreamFailure
他们的直播阶段如下:
val stageA = Flow[InitElement].map {
init =>
if (init.v > 10)
StageA(init.v)
else
StageAFailure(s"${init.v} less than 10")
}
def stageB(input: StreamResult)(externalService: Set[Int]) = Flow[StreamResult].map {
case failure: StreamFailure => failure
case StageA(v) =>
if (externalService.contains(v))
StageB(v)
else
StageBFailure(s"$v is absent in external service")
}
val graph = Source.single(InitElement(12))
.via(stageA)
//How can I pass output of previous stage into 'stageB' function?
.via(stageB(_)(Set(11, 15, 20)))
.toMat(Sink.head)(Keep.right)
我看不出有什么方法可以将 stageA
的结果传递给构造下一阶段的函数。
我该如何实施?
您对 stageB
的定义构建了一个 Flow[StreamResult, StreamResult, _]
。它不需要将 (input: StreamResult)
作为参数。请注意,您不要在 stageB
定义中的任何地方使用 input
。该元素来自 Flow[StreamResult]
,您将在其上 map
。
这应该足够了:
def stageB(externalService: Set[Int]) = … // As you wrote it
val graph = Source.single(InitElement(12))
.via(stageA)
.via(stageB(Set(11, 15, 20)))
.toMat(Sink.head)(Keep.right)
我正在玩 Akka Streams,对通过带参数的函数构建 Flow
感到好奇。参数应该从前一个流阶段传递。
假设我们有以下输入和输出:
case class InitElement(v: Int)
trait StreamResult
case class StageA(v: Int) extends StreamResult
case class StageB(v: Int) extends StreamResult
trait StreamFailure extends StreamResult { val msg: String }
case class StageAFailure(msg: String) extends StreamFailure
case class StageBFailure(msg: String) extends StreamFailure
他们的直播阶段如下:
val stageA = Flow[InitElement].map {
init =>
if (init.v > 10)
StageA(init.v)
else
StageAFailure(s"${init.v} less than 10")
}
def stageB(input: StreamResult)(externalService: Set[Int]) = Flow[StreamResult].map {
case failure: StreamFailure => failure
case StageA(v) =>
if (externalService.contains(v))
StageB(v)
else
StageBFailure(s"$v is absent in external service")
}
val graph = Source.single(InitElement(12))
.via(stageA)
//How can I pass output of previous stage into 'stageB' function?
.via(stageB(_)(Set(11, 15, 20)))
.toMat(Sink.head)(Keep.right)
我看不出有什么方法可以将 stageA
的结果传递给构造下一阶段的函数。
我该如何实施?
您对 stageB
的定义构建了一个 Flow[StreamResult, StreamResult, _]
。它不需要将 (input: StreamResult)
作为参数。请注意,您不要在 stageB
定义中的任何地方使用 input
。该元素来自 Flow[StreamResult]
,您将在其上 map
。
这应该足够了:
def stageB(externalService: Set[Int]) = … // As you wrote it
val graph = Source.single(InitElement(12))
.via(stageA)
.via(stageB(Set(11, 15, 20)))
.toMat(Sink.head)(Keep.right)