使用 Akka Http 转换 Slick Streaming 数据并发送 Chunked Response
Transforming Slick Streaming data and sending Chunked Response using Akka Http
目的是从数据库流式传输数据,对这块数据执行一些计算(这种计算 returns 某些情况下的 Future class)并将这些数据作为分块响应发送到用户。目前,我能够在不执行任何计算的情况下传输数据并发送响应。但是,我无法执行此计算然后流式传输结果。
这是我实现的路由
def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}
函数 getRds returns 将 table 的行映射到一个案例 class(使用 slick)。现在考虑将每一行作为输入的函数 compute 和 returns 另一个案例 class 的 Future。像
def compute(x: Tweet) : Future[TweetNew] = ?
如何在变量 src 上实现此函数并将此计算的分块响应(作为流)发送给用户。
您可以使用 mapAsync
:
转换源
val src =
Source.fromPublisher(db.stream(getRds))
.mapAsync(parallelism = 3)(compute)
complete(src)
根据需要调整并行度。
请注意,您可能需要配置 Slick documentation 中提到的一些设置:
Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(with the desired page size n
) and .transactionally
for proper streaming.
因此,例如,如果您使用的是 PostgreSQL,那么您的 Source
可能如下所示:
val src =
Source.fromPublisher(
db.stream(
getRds.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
).transactionally
)
).mapAsync(parallelism = 3)(compute)
您需要有一种方法来编组 TweetNew,而且如果您发送长度为 0 的块,客户端可能会关闭连接。
此代码适用于 curl:
case class TweetNew(str: String)
def compute(string: String) : Future[TweetNew] = Future {
TweetNew(string)
}
val route = path("hello") {
get {
val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
.mapAsync(2)(compute)
.map(tweet => ByteString(tweet.str + "\n"))
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
}
}
目的是从数据库流式传输数据,对这块数据执行一些计算(这种计算 returns 某些情况下的 Future class)并将这些数据作为分块响应发送到用户。目前,我能够在不执行任何计算的情况下传输数据并发送响应。但是,我无法执行此计算然后流式传输结果。
这是我实现的路由
def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}
函数 getRds returns 将 table 的行映射到一个案例 class(使用 slick)。现在考虑将每一行作为输入的函数 compute 和 returns 另一个案例 class 的 Future。像
def compute(x: Tweet) : Future[TweetNew] = ?
如何在变量 src 上实现此函数并将此计算的分块响应(作为流)发送给用户。
您可以使用 mapAsync
:
val src =
Source.fromPublisher(db.stream(getRds))
.mapAsync(parallelism = 3)(compute)
complete(src)
根据需要调整并行度。
请注意,您可能需要配置 Slick documentation 中提到的一些设置:
Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both
.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(with the desired page sizen
) and.transactionally
for proper streaming.
因此,例如,如果您使用的是 PostgreSQL,那么您的 Source
可能如下所示:
val src =
Source.fromPublisher(
db.stream(
getRds.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
).transactionally
)
).mapAsync(parallelism = 3)(compute)
您需要有一种方法来编组 TweetNew,而且如果您发送长度为 0 的块,客户端可能会关闭连接。
此代码适用于 curl:
case class TweetNew(str: String)
def compute(string: String) : Future[TweetNew] = Future {
TweetNew(string)
}
val route = path("hello") {
get {
val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
.mapAsync(2)(compute)
.map(tweet => ByteString(tweet.str + "\n"))
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
}
}