使用 Akka Streams 进行动态扇出

Dynamic fan out with Akka Streams

我正在构建一个经过几个步骤的 Akka Streams 应用程序。有一个特定的步骤产生 0 个或多个结果,事先不知道有多少。每个结果都必须异步处理(由同一类组件),最后必须合并所有结果。

我应该如何在 Akka Streams 中对其进行建模?我注意到 GraphDsl 有一个 Broadcast 元素,可以让你模拟一个扇出,但这似乎只有在事先知道出口数量的情况下才有可能。 在 Akka Streams 中有没有一种方法可以拥有类似 Broadcast 的东西,但可以扇出到动态数量的网点?

查看此页面中的 Hubshttps://doc.akka.io/docs/akka/current/stream/stream-dynamic.html?language=scala

There are many cases when consumers or producers of a certain service (represented as a Sink, Source, or possibly Flow) are dynamic and not known in advance. The Graph DSL does not allow to represent this, all connections of the graph must be known in advance and must be connected upfront. To allow dynamic fan-in and fan-out streaming, the Hubs should be used.

原来mapConcat如我所愿。这是一个 POC:

package streams

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.util.Random

object StreamsTest extends App {
  implicit val system = ActorSystem("TestSystem")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  case class SplitRequest(s: String)

  def requestHandlerWithMultipleResults(request: SplitRequest): List[String] = 
   request.s.split(" ").toList

  def slowProcessingTask(s: String) =  {
    Thread.sleep(Random.nextInt(5000))
    s.toUpperCase
  }

  val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val source: Source[String, NotUsed] = Source(List(SplitRequest("january february march april may")))
      .mapConcat(requestHandlerWithMultipleResults)
      .mapAsyncUnordered(5)(s => Future(slowProcessingTask(s)))

    val sink = Sink.foreach(println)

    source ~> sink

    ClosedShape
  })

  g.run()
}

输出,例如:

MAY
JANUARY
FEBRUARY
MARCH
APRIL