GraphStage 形状为 2-in 2-out
GraphStage with shape of 2-in 2-out
我需要编写一个具有两个输入端口和两个输出端口的自定义 GraphStage
。这 GraphStage
将允许两个原本独立的流相互影响。我可以使用什么形状? FanOutShape2
有两个输出,FanInShape2
有两个输入,但我怎样才能拥有同时具有这两个输出的形状?以某种方式结合(继承)两者?使用 BidiFlow
?自己做?
我自己回答这个问题,因为这个问题已经被 discuss.lightbend.com 上有帮助的人解决了,请参阅 https://discuss.lightbend.com/t/graphstage-with-shape-of-2-in-and-2-out/4160/3
这个问题的答案就是简单地使用BidiShape
。尽管名称在其他方面很暴露,但 BidiShape
背后的逻辑绝不是 bi-directional(回想起来很明显,但我被这个搞糊涂了)。
一些代码,如果有人处于类似情况,他们必须基于两个输入做一些事情,并且有可能推送到两个输出,可以参考这些代码:
class BiNoneCounter[T]() extends GraphStage[BidiShape[Option[T], Option[Int], Option[T], Option[Int]]] {
private val leftIn = Inlet[Option[T]]("BiNoneCounter.in1")
private val rightIn = Inlet[Option[T]]("BiNoneCounter.in2")
private val leftOut = Outlet[Option[Int]]("BiNoneCounter.out1")
private val rightOut = Outlet[Option[Int]]("BiNoneCounter.out2")
override val shape = BidiShape(leftIn, leftOut, rightIn, rightOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var grabNextPush = false
val inHandler = new InHandler {
override def onPush(): Unit = {
if (grabNextPush) {
(grab(leftIn), grab(rightIn)) match {
// do stuff here
}
}
grabNextPush = !grabNextPush
}
}
val outHandler = (inlet: Inlet[Option[T]]) => new OutHandler {
override def onPull(): Unit = {
pull(inlet)
}
}
setHandler(leftOut, outHandler(leftIn))
setHandler(rightOut, outHandler(rightIn))
setHandler(leftIn, inHandler)
setHandler(rightIn, inHandler)
}
}
可以这样使用:
sourceOne ~> bidi.in1
bidi.out1 ~> sinkOne
sourceTwo ~> bidi.in2
bidi.out2 ~> sinkTwo
我需要编写一个具有两个输入端口和两个输出端口的自定义 GraphStage
。这 GraphStage
将允许两个原本独立的流相互影响。我可以使用什么形状? FanOutShape2
有两个输出,FanInShape2
有两个输入,但我怎样才能拥有同时具有这两个输出的形状?以某种方式结合(继承)两者?使用 BidiFlow
?自己做?
我自己回答这个问题,因为这个问题已经被 discuss.lightbend.com 上有帮助的人解决了,请参阅 https://discuss.lightbend.com/t/graphstage-with-shape-of-2-in-and-2-out/4160/3
这个问题的答案就是简单地使用BidiShape
。尽管名称在其他方面很暴露,但 BidiShape
背后的逻辑绝不是 bi-directional(回想起来很明显,但我被这个搞糊涂了)。
一些代码,如果有人处于类似情况,他们必须基于两个输入做一些事情,并且有可能推送到两个输出,可以参考这些代码:
class BiNoneCounter[T]() extends GraphStage[BidiShape[Option[T], Option[Int], Option[T], Option[Int]]] {
private val leftIn = Inlet[Option[T]]("BiNoneCounter.in1")
private val rightIn = Inlet[Option[T]]("BiNoneCounter.in2")
private val leftOut = Outlet[Option[Int]]("BiNoneCounter.out1")
private val rightOut = Outlet[Option[Int]]("BiNoneCounter.out2")
override val shape = BidiShape(leftIn, leftOut, rightIn, rightOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var grabNextPush = false
val inHandler = new InHandler {
override def onPush(): Unit = {
if (grabNextPush) {
(grab(leftIn), grab(rightIn)) match {
// do stuff here
}
}
grabNextPush = !grabNextPush
}
}
val outHandler = (inlet: Inlet[Option[T]]) => new OutHandler {
override def onPull(): Unit = {
pull(inlet)
}
}
setHandler(leftOut, outHandler(leftIn))
setHandler(rightOut, outHandler(rightIn))
setHandler(leftIn, inHandler)
setHandler(rightIn, inHandler)
}
}
可以这样使用:
sourceOne ~> bidi.in1
bidi.out1 ~> sinkOne
sourceTwo ~> bidi.in2
bidi.out2 ~> sinkTwo