图中的 Akka 流反馈循环不起作用

Akka streams feedback loop in a graph is not working

我正在尝试构建一个具有重试反馈循环的图表,但是当运行遇到第一次失败时执行停止

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._

val in = Source(1 to 10).buffer(1, OverflowStrategy.fail)
val out = Sink.foreach(println)

// purely for logging purposes
val m2 = builder.add(Flow[Either[Int, Int]].map(i => {println("flow:" + i); i}))


val mapper = builder.add(Flow[Int].statefulMapConcat{ () =>
  var retries = 0
  i =>

  // deliberately fails to simulate failures
  // supposed to retry that element
  if (i == 3) {
    if (retries > 0){
      List(Right(i))
    } else {
      retries += 1
      List(Left(i))
    }
  } else {
    List(Right(i))
  }
})

val concat = builder.add(Concat[Int](2))

val broadcast = builder.add(Broadcast[Either[Int, Int]](2))


in ~> concat.in(0)

concat.out ~> mapper ~> m2 ~> broadcast

broadcast.out(1).filter(_.isRight).map(_.right.get) ~> out
broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> concat.in(1)


ClosedShape
})

graph.run()

mapper 有状态流 - 应该进行一些任意计算并跟踪重试,输出 Either 实例,成功时为右,失败时为左

稍后我尝试将失败重定向回 concat 以重试并将成功重定向到接收器。

输出如下所示:

flow:Right(1)
1
flow:Right(2)
2
flow:Left(3)

我还没弄清楚为什么它不能与 Broadcast 一起使用,但这是一个与 Partition 一起工作的版本:

val partition = builder.add(Partition[Either[Int, Int]](2, input => if (input.isRight) 0 else 1))
concat.out ~> mapper ~> m2 ~> partition.in
partition.out(0).map(_.right.get) ~> out
partition.out(1).map(_.left.get ) ~> concat.in(1)