GraphStage 形状为 2-in 2-out

GraphStage with shape of 2-in 2-out

我需要编写一个具有两个输入端口和两个输出端口的自定义 GraphStage。这 GraphStage 将允许两个原本独立的流相互影响。我可以使用什么形状? FanOutShape2 有两个输出,FanInShape2 有两个输入,但我怎样才能拥有同时具有这两个输出的形状?以某种方式结合(继承)两者?使用 BidiFlow?自己做?

我自己回答这个问题,因为这个问题已经被 discuss.lightbend.com 上有帮助的人解决了,请参阅 https://discuss.lightbend.com/t/graphstage-with-shape-of-2-in-and-2-out/4160/3

这个问题的答案就是简单地使用BidiShape。尽管名称在其他方面很暴露,但 BidiShape 背后的逻辑绝不是 bi-directional(回想起来很明显,但我被这个搞糊涂了)。

一些代码,如果有人处于类似情况,他们必须基于两个输入做一些事情,并且有可能推送到两个输出,可以参考这些代码:

class BiNoneCounter[T]() extends GraphStage[BidiShape[Option[T], Option[Int], Option[T], Option[Int]]] {
  private val leftIn = Inlet[Option[T]]("BiNoneCounter.in1")
  private val rightIn = Inlet[Option[T]]("BiNoneCounter.in2")
  private val leftOut = Outlet[Option[Int]]("BiNoneCounter.out1")
  private val rightOut = Outlet[Option[Int]]("BiNoneCounter.out2")
  override val shape = BidiShape(leftIn, leftOut, rightIn, rightOut)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var grabNextPush = false

    val inHandler = new InHandler {
      override def onPush(): Unit = {
        if (grabNextPush) {
          (grab(leftIn), grab(rightIn)) match {
            // do stuff here
          }
        }
        grabNextPush = !grabNextPush
      }
    }

    val outHandler = (inlet: Inlet[Option[T]]) => new OutHandler {
      override def onPull(): Unit = {
        pull(inlet)
      }
    }

    setHandler(leftOut, outHandler(leftIn))
    setHandler(rightOut, outHandler(rightIn))
    setHandler(leftIn, inHandler)
    setHandler(rightIn, inHandler)
  }
}

可以这样使用:

        sourceOne ~> bidi.in1
                     bidi.out1 ~> sinkOne
        sourceTwo ~> bidi.in2
                     bidi.out2 ~> sinkTwo