图中的 Akka 流反馈循环不起作用
Akka streams feedback loop in a graph is not working
我正在尝试构建一个具有重试反馈循环的图表,但是当运行遇到第一次失败时执行停止
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val in = Source(1 to 10).buffer(1, OverflowStrategy.fail)
val out = Sink.foreach(println)
// purely for logging purposes
val m2 = builder.add(Flow[Either[Int, Int]].map(i => {println("flow:" + i); i}))
val mapper = builder.add(Flow[Int].statefulMapConcat{ () =>
var retries = 0
i =>
// deliberately fails to simulate failures
// supposed to retry that element
if (i == 3) {
if (retries > 0){
List(Right(i))
} else {
retries += 1
List(Left(i))
}
} else {
List(Right(i))
}
})
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Either[Int, Int]](2))
in ~> concat.in(0)
concat.out ~> mapper ~> m2 ~> broadcast
broadcast.out(1).filter(_.isRight).map(_.right.get) ~> out
broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> concat.in(1)
ClosedShape
})
graph.run()
mapper
有状态流 - 应该进行一些任意计算并跟踪重试,输出 Either 实例,成功时为右,失败时为左
稍后我尝试将失败重定向回 concat
以重试并将成功重定向到接收器。
输出如下所示:
flow:Right(1)
1
flow:Right(2)
2
flow:Left(3)
我还没弄清楚为什么它不能与 Broadcast
一起使用,但这是一个与 Partition
一起工作的版本:
val partition = builder.add(Partition[Either[Int, Int]](2, input => if (input.isRight) 0 else 1))
concat.out ~> mapper ~> m2 ~> partition.in
partition.out(0).map(_.right.get) ~> out
partition.out(1).map(_.left.get ) ~> concat.in(1)
我正在尝试构建一个具有重试反馈循环的图表,但是当运行遇到第一次失败时执行停止
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val in = Source(1 to 10).buffer(1, OverflowStrategy.fail)
val out = Sink.foreach(println)
// purely for logging purposes
val m2 = builder.add(Flow[Either[Int, Int]].map(i => {println("flow:" + i); i}))
val mapper = builder.add(Flow[Int].statefulMapConcat{ () =>
var retries = 0
i =>
// deliberately fails to simulate failures
// supposed to retry that element
if (i == 3) {
if (retries > 0){
List(Right(i))
} else {
retries += 1
List(Left(i))
}
} else {
List(Right(i))
}
})
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Either[Int, Int]](2))
in ~> concat.in(0)
concat.out ~> mapper ~> m2 ~> broadcast
broadcast.out(1).filter(_.isRight).map(_.right.get) ~> out
broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> concat.in(1)
ClosedShape
})
graph.run()
mapper
有状态流 - 应该进行一些任意计算并跟踪重试,输出 Either 实例,成功时为右,失败时为左
稍后我尝试将失败重定向回 concat
以重试并将成功重定向到接收器。
输出如下所示:
flow:Right(1)
1
flow:Right(2)
2
flow:Left(3)
我还没弄清楚为什么它不能与 Broadcast
一起使用,但这是一个与 Partition
一起工作的版本:
val partition = builder.add(Partition[Either[Int, Int]](2, input => if (input.isRight) 0 else 1))
concat.out ~> mapper ~> m2 ~> partition.in
partition.out(0).map(_.right.get) ~> out
partition.out(1).map(_.left.get ) ~> concat.in(1)