在 akka-stream 中使用 source 两次
Use source twice with akka-stream
我正在为我构建的 Web 应用程序使用 Play 框架。 Play 2.5 使用 Akka Stream API 允许流式传输 request/response.
我有一个端点,传入的文件直接流式传输到 Google 驱动器。
我定义了一个看起来像这样的 BodyParser
:
BodyParser("toDrive") { request =>
Accumulator.source[ByteString].mapFuture { source =>
Future.successful(Right("Done"))
}
}
我使用源 (Source[ByteString, _]
) 并将其输入 StreamedBody
,我将其与 Play 提供的 WSClient
一起使用。
我想使用给定的 Source
并使用 WSClient
.
进行两个不同的 HTTP 调用
我通过将相同的 Source
传递给两个不同的 WSClient
调用来尝试这种天真的方法,但它失败了。我认为我的问题的解决方案是广播。
我想利用源中的内容来创建 2 个供我的 WSClient
使用的源。
我还在玩 Source
、Flow
和 Sink
。我正在努力弄清楚这一切。
我想 Source
的 .alsoTo()
方法就是您要找的。在内部它只是广播。
更新的解决方案:
Accumulator[ByteString, Either[Result, String]] {
val s1 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 1"))
}
val s2 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 2"))
}
def combine(val1: Future[Either[Result, String]],
val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
for {
res1 <- val1
res2 <- val2
} yield {
// do something with your result
res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
}
}
Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[ByteString](2))
broadcast ~> sink
broadcast ~> sink2
SinkShape(broadcast.in)
})
}
稍微解释一下(AFAIK)。我创建了 2 个水槽并将它们组合在一个水槽后面。 Accumulator.apply
需要 1 个 Sink[E, Future[A]]
。 BodyParser
迫使我将 ByteString
用作 E
,这是 进入 接收器的 type
数据。
所以 2 个吸收 ByteString
并具体化为 Future[String]
。我将 Sink
转换为 Source
,因为我使用的 API (WsClient) 可以将 Source
作为主体。这个 API 给了我一个 Future[HttpResponse]
(为了解决问题,我将其简化为 Future[String]
但你可以在那里做任何你想做的事。
现在这就是 akka-streams
API 发挥作用的地方。我强烈建议您查看 documentation 以获得更好的理解。话虽如此,在这里,我使用 GraphDSL API 将我的 2 个水槽组合在一个水槽后面。任何进入外露水槽的 ByteString
都会被送入 2 个内部水槽。
注意:有一个方便的 Sink.combine
函数可以获取 n
个流并将它们组合成一个。但是使用这个函数意味着loosing the materialized value(在这种情况下,Future[String]
)
下面提出的原始解决方案无法正常工作。它仅向其中一个来源发送数据。
Play Accumulator
也可以通过给它一个 Sink
来创建。
我使用了这种方法,到目前为止似乎有效:
BodyParser("toDrive") { request =>
def sourceToFut(src: Source): Future[T] = ???
Accumulator[ByteString, Either[Result, T]] {
Sink
.asPublisher[ByteString](fanout = true)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
val upload1Fut = sourceToFut(source)
val upload2Fut = sourceToFut(source)
for {
file1 <- upload1Fut
file2 <- upload2Fut
} yield {
(file1, file2)
}
}
}
}
与我最初的方法相比,唯一有效的变化是我自己创建了 Sink
并允许 fanout
这样我就可以在两个不同的 WSClient
调用中使用源代码两次.
你怎么看@专家?
我正在为我构建的 Web 应用程序使用 Play 框架。 Play 2.5 使用 Akka Stream API 允许流式传输 request/response.
我有一个端点,传入的文件直接流式传输到 Google 驱动器。
我定义了一个看起来像这样的 BodyParser
:
BodyParser("toDrive") { request =>
Accumulator.source[ByteString].mapFuture { source =>
Future.successful(Right("Done"))
}
}
我使用源 (Source[ByteString, _]
) 并将其输入 StreamedBody
,我将其与 Play 提供的 WSClient
一起使用。
我想使用给定的 Source
并使用 WSClient
.
我通过将相同的 Source
传递给两个不同的 WSClient
调用来尝试这种天真的方法,但它失败了。我认为我的问题的解决方案是广播。
我想利用源中的内容来创建 2 个供我的 WSClient
使用的源。
我还在玩 Source
、Flow
和 Sink
。我正在努力弄清楚这一切。
我想 Source
的 .alsoTo()
方法就是您要找的。在内部它只是广播。
更新的解决方案:
Accumulator[ByteString, Either[Result, String]] {
val s1 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 1"))
}
val s2 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 2"))
}
def combine(val1: Future[Either[Result, String]],
val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
for {
res1 <- val1
res2 <- val2
} yield {
// do something with your result
res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
}
}
Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[ByteString](2))
broadcast ~> sink
broadcast ~> sink2
SinkShape(broadcast.in)
})
}
稍微解释一下(AFAIK)。我创建了 2 个水槽并将它们组合在一个水槽后面。 Accumulator.apply
需要 1 个 Sink[E, Future[A]]
。 BodyParser
迫使我将 ByteString
用作 E
,这是 进入 接收器的 type
数据。
所以 2 个吸收 ByteString
并具体化为 Future[String]
。我将 Sink
转换为 Source
,因为我使用的 API (WsClient) 可以将 Source
作为主体。这个 API 给了我一个 Future[HttpResponse]
(为了解决问题,我将其简化为 Future[String]
但你可以在那里做任何你想做的事。
现在这就是 akka-streams
API 发挥作用的地方。我强烈建议您查看 documentation 以获得更好的理解。话虽如此,在这里,我使用 GraphDSL API 将我的 2 个水槽组合在一个水槽后面。任何进入外露水槽的 ByteString
都会被送入 2 个内部水槽。
注意:有一个方便的 Sink.combine
函数可以获取 n
个流并将它们组合成一个。但是使用这个函数意味着loosing the materialized value(在这种情况下,Future[String]
)
下面提出的原始解决方案无法正常工作。它仅向其中一个来源发送数据。
Play Accumulator
也可以通过给它一个 Sink
来创建。
我使用了这种方法,到目前为止似乎有效:
BodyParser("toDrive") { request =>
def sourceToFut(src: Source): Future[T] = ???
Accumulator[ByteString, Either[Result, T]] {
Sink
.asPublisher[ByteString](fanout = true)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
val upload1Fut = sourceToFut(source)
val upload2Fut = sourceToFut(source)
for {
file1 <- upload1Fut
file2 <- upload2Fut
} yield {
(file1, file2)
}
}
}
}
与我最初的方法相比,唯一有效的变化是我自己创建了 Sink
并允许 fanout
这样我就可以在两个不同的 WSClient
调用中使用源代码两次.
你怎么看@专家?