如何在 Akka 流中合并任意数量的源?

How can I merge an arbitrary number of sources in Akka stream?

我有 n 个源,我想在 Akka 流中按优先级合并。我的实施基于 GraphMergePrioritiziedSpec,其中合并了三个优先来源。我试图通过以下方式抽象出 Source 的数量:

import akka.NotUsed
import akka.stream.{ClosedShape, Graph, Materializer}
import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
import org.apache.activemq.ActiveMQConnectionFactory

class SourceMerger(
  sources: Seq[Source[java.io.Serializable, NotUsed]],
  priorities: Seq[Int],
  private val sink: Sink[java.io.Serializable, _]
) {

  require(sources.size == priorities.size, "Each source should have a priority")

  import GraphDSL.Implicits._

  private def partial(
    sources: Seq[Source[java.io.Serializable, NotUsed]],
    priorities: Seq[Int],
    sink: Sink[java.io.Serializable, _]
  ): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>

      val merge = b.add(MergePrioritized[java.io.Serializable](priorities))

      sources.zipWithIndex.foreach { case (s, i) =>
        s.shape.out ~> merge.in(i)
      }

      merge.out ~> sink
      ClosedShape
  }

  def merge(
    sources: Seq[Source[java.io.Serializable, NotUsed]],
    priorities: Seq[Int],
    sink: Sink[java.io.Serializable, _]
  ): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))

  def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
}

但是,当 运行 以下存根时出现错误:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{Matchers, WordSpecLike}
import akka.testkit.TestKit

import scala.collection.immutable.Iterable

class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {

  implicit val materializer: Materializer = ActorMaterializer()

  "A SourceMerger" should {
    "merge by priority" in {

      val priorities: Seq[Int] = Seq(1,2,3)

      val highPriority = Iterable("message1", "message2", "message3")
      val mediumPriority = Iterable("message4", "message5", "message6")
      val lowPriority = Iterable("message7", "message8", "message9")

      val source1 = Source[String](highPriority)
      val source2 = Source[String](mediumPriority)
      val source3 = Source[String](lowPriority)

      val sources = Seq(source1, source2, source3)

      val subscriber = Sink.seq[java.io.Serializable]

      val merger = new SourceMerger(sources, priorities, subscriber)

      merger.run()

      source1.runWith(Sink.foreach(println))
    }
  }

}

相关的堆栈跟踪在这里:

[StatefulMapConcat.out] is already connected
java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
    at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
    at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
    at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
    at SourceMerger$$anonfun$partial$$anonfun$apply.apply(SourceMerger.scala:26)
    at SourceMerger$$anonfun$partial$$anonfun$apply.apply(SourceMerger.scala:25)

好像是这个错误:

sources.zipWithIndex.foreach { case (s, i) =>
  s.shape.out ~> merge.in(i)
}

是否可以在 Akka 流图 DSL 中合并任意数量的 Source?如果是,为什么我的尝试没有成功?

代码示例的主要问题

问题中提供的代码片段的一个大问题是 source1merge 调用和 Sink.foreach(println) 连接到 Sink。如果没有 intermediate fan-out element.

,同一个 Source 无法连接到多个接收器

删除 Sink.foreach(println) 可能会彻底解决您的问题。

简化设计

基于来自特定 Source 的所有消息都具有相同优先级这一事实,可以简化合并。这意味着您可以按各自的优先级对源进行排序,然后将它们连接在一起:

private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
                    priorities: Seq[Int],
                    sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] = 
   sources.zip(priorities)
          .sortWith(_._2 < _._2)
          .map(_._1)
          .reduceOption(_ ++ _)
          .getOrElse(Source.empty[java.io.Serializable])
          .to(sink)

如果我替换

,您的代码运行没有错误
  sources.zipWithIndex.foreach { case (s, i) =>
    s.shape.out ~> merge.in(i)
  }

  sources.zipWithIndex.foreach { case (s, i) =>
    s ~> merge.in(i)
  }

我承认我不太清楚为什么!无论如何,s.shape 是一个 StatefulMapConcat,这就是它抱怨输出端口已经连接的地方。即使您只传递一个源也会出现问题,因此任意数字不是问题。