使用 Akka Http 转换 Slick Streaming 数据并发送 Chunked Response

Transforming Slick Streaming data and sending Chunked Response using Akka Http

目的是从数据库流式传输数据,对这块数据执行一些计算(这种计算 returns 某些情况下的 Future class)并将这些数据作为分块响应发送到用户。目前,我能够在不执行任何计算的情况下传输数据并发送响应。但是,我无法执行此计算然后流式传输结果。

这是我实现的路由

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}

函数 getRds returns 将 table 的行映射到一个案例 class(使用 slick)。现在考虑将每一行作为输入的函数 compute 和 returns 另一个案例 class 的 Future。像

def compute(x: Tweet) : Future[TweetNew] = ?

如何在变量 src 上实现此函数并将此计算的分块响应(作为流)发送给用户。

您可以使用 mapAsync:

转换源
val src =
  Source.fromPublisher(db.stream(getRds))
        .mapAsync(parallelism = 3)(compute)

complete(src)

根据需要调整并行度。


请注意,您可能需要配置 Slick documentation 中提到的一些设置:

Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

因此,例如,如果您使用的是 PostgreSQL,那么您的 Source 可能如下所示:

val src =
  Source.fromPublisher(
    db.stream(
      getRds.withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      ).transactionally
    )
  ).mapAsync(parallelism = 3)(compute)

您需要有一种方法来编组 TweetNew,而且如果您发送长度为 0 的块,客户端可能会关闭连接。

此代码适用于 curl:

case class TweetNew(str: String)

def compute(string: String) : Future[TweetNew] = Future {
  TweetNew(string)
}

val route = path("hello") {
  get {
    val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
      .mapAsync(2)(compute)
      .map(tweet => ByteString(tweet.str + "\n"))
    complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
  }
}