多路复用 akka 将源流分成两个副本

Multiplex akka streams source into two copies

我正在尝试将流媒体源缓存到磁盘,同时我也将其作为 HttpResponse 发送出去,即我有一个 Source[ByteString,_] 我想交给 [=13] =],但我也想 运行 将相同的数据放入 FileIO.toPath 接收器。

                       |-> FileIO.toPath
Source[ByteString,_] ->|
                       |-> HttpEntity(contentType, Source[ByteString,_]

似乎 Broadcast 是我应该用于扇出的,但根据描述,它写入两个接收器,而 FileIO.toPath 是一个接收器,HttpEntity 期望一个Source.

还有 Source.fromGraph 看起来它会从 GraphStage 创建一个源,例如 Broadcast 阶段,但我不太清楚如何获得 FileIO沉入其中。

您可以使用 alsoTo:

val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)