定义自定义分区阶段的问题(无法两次拉入端口)
Issue with defining custom partition stage (Cannot pull in port twice)
所以我有这个用于在 Akka Streams 中进行分区的自定义阶段。
object CustomPartitioner {
/**
* Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
*
* @param partitionF applies function, if true, route to B, otherwise route to C.
*
* @tparam A type of input
* @tparam B type of output on the first outlet.
* @tparam C type of output on the second outlet.
*
* @return A partition stage
*/
def apply[A, B, C](partitionF: A => Either[B, C]) =
new GraphStage[FanOutShape2[A, B, C]] {
private val in: Inlet[A] = Inlet[A]("in")
private val outB = Outlet[B]("outB")
private val outC = Outlet[C]("outC")
private val pendingB = MutableQueue.empty[B]
private val pendingC = MutableQueue.empty[C]
override def shape: FanOutShape2[A, B, C] = new FanOutShape2(in, outB, outC)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, this)
setHandler(outB, this)
setHandler(outC, this)
override def onPush(): Unit = {
val elem = grab(in)
partitionF(elem) match {
case Left(b) =>
pendingB.enqueue(b)
tryPush(outB, pendingB, b)
case Right(c) =>
pendingC.enqueue(c)
tryPush(outC, pendingC, c)
}
}
override def onPull(): Unit = pull(in)
private def tryPush[T](out: Outlet[T], pending: MutableQueue[T]): Unit =
if (isAvailable(out) && pending.nonEmpty) push(out, pending.dequeue())
}
}
我已将其作为分区程序连接到流程中,然后将其合并回接收器中。
当我尝试使用组件测试通过流推送消息时
java.lang.IllegalArgumentException: Cannot pull port (in(256390569)) twice
然后测试失败
java.lang.AssertionError: assertion failed: expected: expecting request() signal but got unexpected message CancelSubscription(PublisherProbeSubscription(akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$$anon@53c99b09,akka.testkit.TestProbe@2539cd1c))
我很确定我弄乱了 setHandler 调用,因为有两个调用同时处理 outB 和 outC。但是我不知道如何修复它,使整个系统只调用 onPush 和 onPull 一次。
我设法在
之前让它开始工作
override def onPull(): Unit =
if (!hasBeenPulled(in))
pull(in)
所以我有这个用于在 Akka Streams 中进行分区的自定义阶段。
object CustomPartitioner {
/**
* Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
*
* @param partitionF applies function, if true, route to B, otherwise route to C.
*
* @tparam A type of input
* @tparam B type of output on the first outlet.
* @tparam C type of output on the second outlet.
*
* @return A partition stage
*/
def apply[A, B, C](partitionF: A => Either[B, C]) =
new GraphStage[FanOutShape2[A, B, C]] {
private val in: Inlet[A] = Inlet[A]("in")
private val outB = Outlet[B]("outB")
private val outC = Outlet[C]("outC")
private val pendingB = MutableQueue.empty[B]
private val pendingC = MutableQueue.empty[C]
override def shape: FanOutShape2[A, B, C] = new FanOutShape2(in, outB, outC)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, this)
setHandler(outB, this)
setHandler(outC, this)
override def onPush(): Unit = {
val elem = grab(in)
partitionF(elem) match {
case Left(b) =>
pendingB.enqueue(b)
tryPush(outB, pendingB, b)
case Right(c) =>
pendingC.enqueue(c)
tryPush(outC, pendingC, c)
}
}
override def onPull(): Unit = pull(in)
private def tryPush[T](out: Outlet[T], pending: MutableQueue[T]): Unit =
if (isAvailable(out) && pending.nonEmpty) push(out, pending.dequeue())
}
}
我已将其作为分区程序连接到流程中,然后将其合并回接收器中。
当我尝试使用组件测试通过流推送消息时
java.lang.IllegalArgumentException: Cannot pull port (in(256390569)) twice
然后测试失败
java.lang.AssertionError: assertion failed: expected: expecting request() signal but got unexpected message CancelSubscription(PublisherProbeSubscription(akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$$anon@53c99b09,akka.testkit.TestProbe@2539cd1c))
我很确定我弄乱了 setHandler 调用,因为有两个调用同时处理 outB 和 outC。但是我不知道如何修复它,使整个系统只调用 onPush 和 onPull 一次。
我设法在
之前让它开始工作override def onPull(): Unit =
if (!hasBeenPulled(in))
pull(in)