在 akka-stream 中使用 source 两次

Use source twice with akka-stream

我正在为我构建的 Web 应用程序使用 Play 框架。 Play 2.5 使用 Akka Stream API 允许流式传输 request/response.

我有一个端点,传入的文件直接流式传输到 Google 驱动器。

我定义了一个看起来像这样的 BodyParser

BodyParser("toDrive") { request =>
  Accumulator.source[ByteString].mapFuture { source =>
    Future.successful(Right("Done"))
  }
}

我使用源 (Source[ByteString, _]) 并将其输入 StreamedBody,我将其与 Play 提供的 WSClient 一起使用。

我想使用给定的 Source 并使用 WSClient.

进行两个不同的 HTTP 调用

我通过将相同的 Source 传递给两个不同的 WSClient 调用来尝试这种天真的方法,但它失败了。我认为我的问题的解决方案是广播。

我想利用源中的内容来创建 2 个供我的 WSClient 使用的源。

我还在玩 SourceFlowSink。我正在努力弄清楚这一切。

我想 Source.alsoTo() 方法就是您要找的。在内部它只是广播。

更新的解决方案:

  Accumulator[ByteString, Either[Result, String]] {
    val s1 = Sink
      .asPublisher[ByteString](fanout = false)
      .mapMaterializedValue(Source.fromPublisher)
      .mapMaterializedValue { source =>
        //do what you need with source
        Future.successful(Right("result 1"))
      }
    val s2 = Sink
      .asPublisher[ByteString](fanout = false)
      .mapMaterializedValue(Source.fromPublisher)
      .mapMaterializedValue { source =>
        //do what you need with source
        Future.successful(Right("result 2"))
      }

    def combine(val1: Future[Either[Result, String]],
                val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
      for {
        res1 <- val1
        res2 <- val2
      } yield {
        // do something with your result
        res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
      }
    }

    Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
      import GraphDSL.Implicits._

      val broadcast = b.add(Broadcast[ByteString](2))

      broadcast ~> sink
      broadcast ~> sink2

      SinkShape(broadcast.in)
    })
  }

稍微解释一下(AFAIK)。我创建了 2 个水槽并将它们组合在一个水槽后面。 Accumulator.apply 需要 1 个 Sink[E, Future[A]]BodyParser 迫使我将 ByteString 用作 E,这是 进入 接收器的 type 数据。

所以 2 个吸收 ByteString 并具体化为 Future[String]。我将 Sink 转换为 Source,因为我使用的 API (WsClient) 可以将 Source 作为主体。这个 API 给了我一个 Future[HttpResponse] (为了解决问题,我将其简化为 Future[String] 但你可以在那里做任何你想做的事。

现在这就是 akka-streams API 发挥作用的地方。我强烈建议您查看 documentation 以获得更好的理解。话虽如此,在这里,我使用 GraphDSL API 将我的 2 个水槽组合在一个水槽后面。任何进入外露水槽的 ByteString 都会被送入 2 个内部水槽。

注意:有一个方便的 Sink.combine 函数可以获取 n 个流并将它们组合成一个。但是使用这个函数意味着loosing the materialized value(在这种情况下,Future[String]

下面提出的原始解决方案无法正常工作。它仅向其中一个来源发送数据。

Play Accumulator 也可以通过给它一个 Sink 来创建。

我使用了这种方法,到目前为止似乎有效:

BodyParser("toDrive") { request =>
  def sourceToFut(src: Source): Future[T] = ???

  Accumulator[ByteString, Either[Result, T]] {
    Sink
      .asPublisher[ByteString](fanout = true)
      .mapMaterializedValue(Source.fromPublisher)
      .mapMaterializedValue { source =>
        val upload1Fut = sourceToFut(source)
        val upload2Fut = sourceToFut(source)
        for {
          file1 <- upload1Fut
          file2 <- upload2Fut
        } yield {
          (file1, file2)
        }
      }
  }
} 

与我最初的方法相比,唯一有效的变化是我自己创建了 Sink 并允许 fanout 这样我就可以在两个不同的 WSClient 调用中使用源代码两次.

你怎么看@专家?