Akka Streams:具有自定义逻辑的扇出运算符
Akka Streams: Fan-out operator with custom logic
我正在寻找一个 Akka Streams 运算符,它允许我根据自定义逻辑拆分流。我期望的消息集是预先知道的,因此不需要对下游消费者进行动态扩展。
在该库的早期版本中 - 当它仍被标记为实验性时 - 有一个 FlexiRoute
运算符。我看到它在某个时候积累了很多垃圾,随后被删除以支持 GraphStage
。
如今,Balance
和 Partition
等运算符已接近我的需要。 Balance
要求我为每个消费者复制逻辑。 Partition
仅适用于两个输出,我需要 N。我可以用每个消息类型 Partition
来实现它,但这看起来很老套。
构建自定义解决方案是唯一的方法吗?
分区就是你所需要的。它适用于 N 而不仅仅是 2 个输出。例如阅读https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html for API, read https://blog.colinbreck.com/partitioning-akka-streams-to-maximize-throughput/#partition。
API 文档的快照:
new Partition(outputPorts: Int, partitioner: (T) ⇒ Int, eagerCancel: Boolean)
示例快照:
val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val workerCount = 4
val partition = b.add(Partition[Int](workerCount, _ % workerCount))
val merge = b.add(Merge[Int](workerCount))
for (_ <- 1 to workerCount) {
partition ~> Flow[Int].map(spin).async ~> merge
}
FlowShape(partition.in, merge.out)
})
Source(1 to 1000)
.via(flow)
.runWith(Sink.ignore)
我正在寻找一个 Akka Streams 运算符,它允许我根据自定义逻辑拆分流。我期望的消息集是预先知道的,因此不需要对下游消费者进行动态扩展。
在该库的早期版本中 - 当它仍被标记为实验性时 - 有一个 FlexiRoute
运算符。我看到它在某个时候积累了很多垃圾,随后被删除以支持 GraphStage
。
如今,Balance
和 Partition
等运算符已接近我的需要。 Balance
要求我为每个消费者复制逻辑。 Partition
仅适用于两个输出,我需要 N。我可以用每个消息类型 Partition
来实现它,但这看起来很老套。
构建自定义解决方案是唯一的方法吗?
分区就是你所需要的。它适用于 N 而不仅仅是 2 个输出。例如阅读https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html for API, read https://blog.colinbreck.com/partitioning-akka-streams-to-maximize-throughput/#partition。
API 文档的快照:
new Partition(outputPorts: Int, partitioner: (T) ⇒ Int, eagerCancel: Boolean)
示例快照:
val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val workerCount = 4
val partition = b.add(Partition[Int](workerCount, _ % workerCount))
val merge = b.add(Merge[Int](workerCount))
for (_ <- 1 to workerCount) {
partition ~> Flow[Int].map(spin).async ~> merge
}
FlowShape(partition.in, merge.out)
})
Source(1 to 1000)
.via(flow)
.runWith(Sink.ignore)