Akka流通过流量限制处理流的并行度/吞吐量
Akka streams pass through flow limiting Parallelism / throughput of processing flow
我有一个用例,我想向外部系统发送消息,但发送此消息的流程采用 returns 我无法在下游使用的类型。这是传递流的一个很好的用例。我正在使用实现 here。最初我担心如果 processingFlow 使用 mapAsyncUnordered 那么这个流程将无法工作。由于处理流程可能会重新排序消息,并且 zip 可能会推出具有不正确对的元组。例如在下面的例子中。
val testSource = Source(1 until 50)
val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
Thread.sleep(Random.nextInt(50))
x * 10
})
val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)
val future = testSource.via(passThroughFlow).runWith(Sink.seq)
我希望处理流程可以根据其输入重新排序其输出,我会得到如下结果:
[(30,1), (40,2),(10,3),(10,4), ...]
右边(通过的总是按顺序排列的)但是通过我的 mapAsyncUnordered 的左边可能与不正确的元素连接,从而形成错误的元组。
相反,我实际上得到:
[(10,1), (20,2),(30,3),(40,4), ...]
每一次。经过进一步调查,我注意到代码 运行ning 很慢,事实上它根本不是 运行ning 并行,尽管我的地图异步无序。我尝试在前后引入一个缓冲区以及一个异步边界,但它似乎总是按顺序 运行。这解释了为什么它总是有序的,但我希望我的处理流程具有更高的吞吐量。
我想出了以下解决方法:
object PassThroughFlow {
def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
keepBoth[A, A1](processingFlow).map(_._2)
def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder => {
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[A](2))
val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
FlowShape(broadcast.in, zip.out)
}
})
}
object ParallelPassThroughFlow {
def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
keepBoth(parallelism, processingFlow).map(_._2)
def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val fanOut = builder.add(Balance[A](outputPorts = parallelism))
val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))
Range(0, parallelism).foreach { n =>
val passThrough = PassThroughFlow.keepBoth(processingFlow)
fanOut.out(n) ~> passThrough ~> merger.in(n)
}
FlowShape(fanOut.in, merger.out)
})
}
}
两个问题:
- 在original implementation中,为什么pass through flow里面的zip会限制map async unordered的并行度?
- 我的工作是否合理或者是否可以改进?我基本上将我的输入扇出到传递流的多个堆栈中,然后将它们全部合并回一起。它似乎具有我想要的属性(并行但即使处理流程重新排序也能保持顺序)但感觉有些不对 right
您看到的行为是 broadcast
和 zip
工作方式的结果:broadcast
在其所有输出都发出需求信号时向下游发射; zip
在发出需求信号(并向下游发射)之前等待其所有输入。
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
通过上图考虑第一个元素(1
)的移动。 1
向 processingFlow
和 zip
广播。 zip
立即接收其输入之一 (1
) 并等待其另一个输入 (10
),这将需要更长的时间到达。只有当 zip
同时获得 1
和 10
时,它才会从上游拉取更多元素,从而触发第二个元素 (2
) 通过流的移动。等等。
至于你的ParallelPassThroughFlow
,我不知道为什么"something doesn't feel right"给你。
我有一个用例,我想向外部系统发送消息,但发送此消息的流程采用 returns 我无法在下游使用的类型。这是传递流的一个很好的用例。我正在使用实现 here。最初我担心如果 processingFlow 使用 mapAsyncUnordered 那么这个流程将无法工作。由于处理流程可能会重新排序消息,并且 zip 可能会推出具有不正确对的元组。例如在下面的例子中。
val testSource = Source(1 until 50)
val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
Thread.sleep(Random.nextInt(50))
x * 10
})
val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)
val future = testSource.via(passThroughFlow).runWith(Sink.seq)
我希望处理流程可以根据其输入重新排序其输出,我会得到如下结果:
[(30,1), (40,2),(10,3),(10,4), ...]
右边(通过的总是按顺序排列的)但是通过我的 mapAsyncUnordered 的左边可能与不正确的元素连接,从而形成错误的元组。
相反,我实际上得到:
[(10,1), (20,2),(30,3),(40,4), ...]
每一次。经过进一步调查,我注意到代码 运行ning 很慢,事实上它根本不是 运行ning 并行,尽管我的地图异步无序。我尝试在前后引入一个缓冲区以及一个异步边界,但它似乎总是按顺序 运行。这解释了为什么它总是有序的,但我希望我的处理流程具有更高的吞吐量。
我想出了以下解决方法:
object PassThroughFlow {
def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
keepBoth[A, A1](processingFlow).map(_._2)
def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder => {
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[A](2))
val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
FlowShape(broadcast.in, zip.out)
}
})
}
object ParallelPassThroughFlow {
def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
keepBoth(parallelism, processingFlow).map(_._2)
def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val fanOut = builder.add(Balance[A](outputPorts = parallelism))
val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))
Range(0, parallelism).foreach { n =>
val passThrough = PassThroughFlow.keepBoth(processingFlow)
fanOut.out(n) ~> passThrough ~> merger.in(n)
}
FlowShape(fanOut.in, merger.out)
})
}
}
两个问题:
- 在original implementation中,为什么pass through flow里面的zip会限制map async unordered的并行度?
- 我的工作是否合理或者是否可以改进?我基本上将我的输入扇出到传递流的多个堆栈中,然后将它们全部合并回一起。它似乎具有我想要的属性(并行但即使处理流程重新排序也能保持顺序)但感觉有些不对 right
您看到的行为是 broadcast
和 zip
工作方式的结果:broadcast
在其所有输出都发出需求信号时向下游发射; zip
在发出需求信号(并向下游发射)之前等待其所有输入。
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
通过上图考虑第一个元素(1
)的移动。 1
向 processingFlow
和 zip
广播。 zip
立即接收其输入之一 (1
) 并等待其另一个输入 (10
),这将需要更长的时间到达。只有当 zip
同时获得 1
和 10
时,它才会从上游拉取更多元素,从而触发第二个元素 (2
) 通过流的移动。等等。
至于你的ParallelPassThroughFlow
,我不知道为什么"something doesn't feel right"给你。