Akka Streams:具有自定义逻辑的扇出运算符

Akka Streams: Fan-out operator with custom logic

我正在寻找一个 Akka Streams 运算符,它允许我根据自定义逻辑拆分流。我期望的消息集是预先知道的,因此不需要对下游消费者进行动态扩展。

在该库的早期版本中 - 当它仍被标记为实验性时 - 有一个 FlexiRoute 运算符。我看到它在某个时候积累了很多垃圾,随后被删除以支持 GraphStage

如今,BalancePartition 等运算符已接近我的需要。 Balance 要求我为每个消费者复制逻辑。 Partition 仅适用于两个输出,我需要 N。我可以用每个消息类型 Partition 来实现它,但这看起来很老套。

构建自定义解决方案是唯一的方法吗?

分区就是你所需要的。它适用于 N 而不仅仅是 2 个输出。例如阅读https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html for API, read https://blog.colinbreck.com/partitioning-akka-streams-to-maximize-throughput/#partition

API 文档的快照:

new Partition(outputPorts: Int, partitioner: (T) ⇒ Int, eagerCancel: Boolean)

示例快照:

val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val workerCount = 4

  val partition = b.add(Partition[Int](workerCount, _ % workerCount))
  val merge = b.add(Merge[Int](workerCount))

  for (_ <- 1 to workerCount) {
    partition ~> Flow[Int].map(spin).async ~> merge
  }

  FlowShape(partition.in, merge.out)
})

Source(1 to 1000)
  .via(flow)
  .runWith(Sink.ignore)