多路复用 akka 将源流分成两个副本
Multiplex akka streams source into two copies
我正在尝试将流媒体源缓存到磁盘,同时我也将其作为 HttpResponse
发送出去,即我有一个 Source[ByteString,_]
我想交给 [=13] =],但我也想 运行 将相同的数据放入 FileIO.toPath
接收器。
|-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]
似乎 Broadcast
是我应该用于扇出的,但根据描述,它写入两个接收器,而 FileIO.toPath
是一个接收器,HttpEntity
期望一个Source
.
还有 Source.fromGraph
看起来它会从 GraphStage 创建一个源,例如 Broadcast
阶段,但我不太清楚如何获得 FileIO
沉入其中。
您可以使用 alsoTo
:
val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)
我正在尝试将流媒体源缓存到磁盘,同时我也将其作为 HttpResponse
发送出去,即我有一个 Source[ByteString,_]
我想交给 [=13] =],但我也想 运行 将相同的数据放入 FileIO.toPath
接收器。
|-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]
似乎 Broadcast
是我应该用于扇出的,但根据描述,它写入两个接收器,而 FileIO.toPath
是一个接收器,HttpEntity
期望一个Source
.
还有 Source.fromGraph
看起来它会从 GraphStage 创建一个源,例如 Broadcast
阶段,但我不太清楚如何获得 FileIO
沉入其中。
您可以使用 alsoTo
:
val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)