如何从官方文档解释这个 Akka 流图?

How to explain this Akka Streams graph from official doc?

我对官方托管的示例代码有几个问题here:

val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)

RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
  (topHS, bottomHS) =>
  import GraphDSL.Implicits._
  val broadcast = builder.add(Broadcast[Int](2))
  Source.single(1) ~> broadcast.in

  broadcast.out(0) ~> sharedDoubler ~> topHS.in
  broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
  ClosedShape
})
  1. 什么时候通过create传入图表?

为什么通过create传入的是topHeadSink, bottomHeadSink,而sharedDoubler不是?它们有什么区别?

  1. 什么时候需要builder.add

我可以在没有builder.add的情况下在图外创建广播吗?如果我在图中添加几个流,我是否也应该通过 builder.add 添加流?有时我们需要 builder.add 有时不需要

,这很令人困惑。

更新

我觉得这仍然令人困惑:

这些方法之间的区别在于,使用 builder.add(...) 导入会忽略导入图的物化值,而通过工厂方法导入则允许包含它。

topHS, bottomHS 是从 create 导入的,因此它们将保留其物化值。如果我做 builder.add(topHS) 怎么办?

您如何解释sharedDoubler:它是否具有物化价值?如果我用 builder.add 会怎样?

  1. GraphDSL.create(topHeadSink, bottomHeadSink)((_, _))((_,_))是什么意思?

看起来像是我们需要的样板文件,但我不确定它是什么。

  1. When do you pass in a graph through create?

当您想要获取传递给 create 工厂方法的图形的物化值时。你问题中RunnableGraph的类型是RunnableGraph[(Future[Int], Future[Int])],意思是图的物化值为(Future[Int], Future[Int]):

val g = RunnableGraph.fromGraph(...).run() // (Future[Int], Future[Int])
val topHeadSinkResult    = g._1 // Future[Int]
val bottomHeadSinkResult = g._2 // Future[Int]

现在考虑以下变体,它定义汇 "inside" 图并丢弃物化值:

val g2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val topHeadSink = Sink.head[Int]
  val bottomHeadSink = Sink.head[Int]
  val broadcast = builder.add(Broadcast[Int](2))

  Source.single(1) ~> broadcast.in
  broadcast.out(0) ~> sharedDoubler ~> topHeadSink
  broadcast.out(1) ~> sharedDoubler ~> bottomHeadSink
  ClosedShape
}).run() // NotUsed

g2的值为NotUsed

  1. When do you need builder.add?

图形的所有组件都必须添加到构建器中,但是 ~> 运算符的变体可以添加最常用的组件——例如 SourceFlow--幕后的建造者。但是,执行扇入(例如Merge)或扇出(例如Broadcast)的junction操作必须显式传递给builder.add 如果您使用的是 Graph DSL。

请注意,对于简单图形,您可以使用联结而无需使用图形 DSL。这是来自 documentation:

的示例
val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())

val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))

Source(List(0, 1, 2)).runWith(sink)
  1. What does this mean? the ((_,_)) of GraphDSL.create(topHeadSink, bottomHeadSink)((_, _))?

这是一个柯里化参数,用于指定要保留的具体化值。这里使用 ((_, _)) 等同于:

val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((t, b) => (t, b)) {
  implicit builder => (topHS, bottomHS) =>
  ...
}).run() // (Future[Int], Future[Int])

换句话说,((_, _))在此上下文中是shorthand for ((t, b) => (t, b)),它保留了传入的两个接收器的各自物化值。例如,如果,您只想保留 topHeadSink 的物化值,您可以将调用更改为以下内容:

val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((t, _) => t) {
  implicit builder => (topHS, bottomHS) =>
  ...
}).run() // Future[Int]