如何在 Akka 流中合并任意数量的源?
How can I merge an arbitrary number of sources in Akka stream?
我有 n
个源,我想在 Akka 流中按优先级合并。我的实施基于 GraphMergePrioritiziedSpec,其中合并了三个优先来源。我试图通过以下方式抽象出 Source
的数量:
import akka.NotUsed
import akka.stream.{ClosedShape, Graph, Materializer}
import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
import org.apache.activemq.ActiveMQConnectionFactory
class SourceMerger(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
private val sink: Sink[java.io.Serializable, _]
) {
require(sources.size == priorities.size, "Each source should have a priority")
import GraphDSL.Implicits._
private def partial(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]
): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>
val merge = b.add(MergePrioritized[java.io.Serializable](priorities))
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
merge.out ~> sink
ClosedShape
}
def merge(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]
): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))
def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
}
但是,当 运行 以下存根时出现错误:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{Matchers, WordSpecLike}
import akka.testkit.TestKit
import scala.collection.immutable.Iterable
class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {
implicit val materializer: Materializer = ActorMaterializer()
"A SourceMerger" should {
"merge by priority" in {
val priorities: Seq[Int] = Seq(1,2,3)
val highPriority = Iterable("message1", "message2", "message3")
val mediumPriority = Iterable("message4", "message5", "message6")
val lowPriority = Iterable("message7", "message8", "message9")
val source1 = Source[String](highPriority)
val source2 = Source[String](mediumPriority)
val source3 = Source[String](lowPriority)
val sources = Seq(source1, source2, source3)
val subscriber = Sink.seq[java.io.Serializable]
val merger = new SourceMerger(sources, priorities, subscriber)
merger.run()
source1.runWith(Sink.foreach(println))
}
}
}
相关的堆栈跟踪在这里:
[StatefulMapConcat.out] is already connected
java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
at SourceMerger$$anonfun$partial$$anonfun$apply.apply(SourceMerger.scala:26)
at SourceMerger$$anonfun$partial$$anonfun$apply.apply(SourceMerger.scala:25)
好像是这个错误:
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
是否可以在 Akka 流图 DSL 中合并任意数量的 Source
?如果是,为什么我的尝试没有成功?
代码示例的主要问题
问题中提供的代码片段的一个大问题是 source1
从 merge
调用和 Sink.foreach(println)
连接到 Sink
。如果没有 intermediate fan-out element.
,同一个 Source
无法连接到多个接收器
删除 Sink.foreach(println)
可能会彻底解决您的问题。
简化设计
基于来自特定 Source
的所有消息都具有相同优先级这一事实,可以简化合并。这意味着您可以按各自的优先级对源进行排序,然后将它们连接在一起:
private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] =
sources.zip(priorities)
.sortWith(_._2 < _._2)
.map(_._1)
.reduceOption(_ ++ _)
.getOrElse(Source.empty[java.io.Serializable])
.to(sink)
如果我替换
,您的代码运行没有错误
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
与
sources.zipWithIndex.foreach { case (s, i) =>
s ~> merge.in(i)
}
我承认我不太清楚为什么!无论如何,s.shape
是一个 StatefulMapConcat
,这就是它抱怨输出端口已经连接的地方。即使您只传递一个源也会出现问题,因此任意数字不是问题。
我有 n
个源,我想在 Akka 流中按优先级合并。我的实施基于 GraphMergePrioritiziedSpec,其中合并了三个优先来源。我试图通过以下方式抽象出 Source
的数量:
import akka.NotUsed
import akka.stream.{ClosedShape, Graph, Materializer}
import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
import org.apache.activemq.ActiveMQConnectionFactory
class SourceMerger(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
private val sink: Sink[java.io.Serializable, _]
) {
require(sources.size == priorities.size, "Each source should have a priority")
import GraphDSL.Implicits._
private def partial(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]
): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>
val merge = b.add(MergePrioritized[java.io.Serializable](priorities))
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
merge.out ~> sink
ClosedShape
}
def merge(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]
): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))
def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
}
但是,当 运行 以下存根时出现错误:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{Matchers, WordSpecLike}
import akka.testkit.TestKit
import scala.collection.immutable.Iterable
class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {
implicit val materializer: Materializer = ActorMaterializer()
"A SourceMerger" should {
"merge by priority" in {
val priorities: Seq[Int] = Seq(1,2,3)
val highPriority = Iterable("message1", "message2", "message3")
val mediumPriority = Iterable("message4", "message5", "message6")
val lowPriority = Iterable("message7", "message8", "message9")
val source1 = Source[String](highPriority)
val source2 = Source[String](mediumPriority)
val source3 = Source[String](lowPriority)
val sources = Seq(source1, source2, source3)
val subscriber = Sink.seq[java.io.Serializable]
val merger = new SourceMerger(sources, priorities, subscriber)
merger.run()
source1.runWith(Sink.foreach(println))
}
}
}
相关的堆栈跟踪在这里:
[StatefulMapConcat.out] is already connected
java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
at SourceMerger$$anonfun$partial$$anonfun$apply.apply(SourceMerger.scala:26)
at SourceMerger$$anonfun$partial$$anonfun$apply.apply(SourceMerger.scala:25)
好像是这个错误:
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
是否可以在 Akka 流图 DSL 中合并任意数量的 Source
?如果是,为什么我的尝试没有成功?
代码示例的主要问题
问题中提供的代码片段的一个大问题是 source1
从 merge
调用和 Sink.foreach(println)
连接到 Sink
。如果没有 intermediate fan-out element.
Source
无法连接到多个接收器
删除 Sink.foreach(println)
可能会彻底解决您的问题。
简化设计
基于来自特定 Source
的所有消息都具有相同优先级这一事实,可以简化合并。这意味着您可以按各自的优先级对源进行排序,然后将它们连接在一起:
private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] =
sources.zip(priorities)
.sortWith(_._2 < _._2)
.map(_._1)
.reduceOption(_ ++ _)
.getOrElse(Source.empty[java.io.Serializable])
.to(sink)
如果我替换
,您的代码运行没有错误 sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
与
sources.zipWithIndex.foreach { case (s, i) =>
s ~> merge.in(i)
}
我承认我不太清楚为什么!无论如何,s.shape
是一个 StatefulMapConcat
,这就是它抱怨输出端口已经连接的地方。即使您只传递一个源也会出现问题,因此任意数字不是问题。