如何使用 akka 流创建可调用源
How to create a callable source using akka streams
我们有以下架构
SQS(source) -> SQS Pollers -> 我们的业务逻辑 -> 从 SQS 删除消息的 Sink。
这是一个 akka 流(我们的业务逻辑有多个阶段)。
现在我们想通过添加一个 HTTP 服务器(不是 Akka HTTP)来扩展这个架构。
现在我们的服务也有了路径
HTTP 服务器 -> 我们的业务逻辑 -> 完成未来指示 HTTP 响应已完成的接收器。
现在,无论何时收到 HTTP 请求,我都需要一种机制来调用流。
现在 SQS 源本质上是一个长 运行 线程,它调用服务并将消息推送到 akka 流的其余部分。
我实际上是在尝试创建一个 "callable" akka 源,这样只有在我们收到请求时才会触发该源。
我在这里寻找 https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html 作为一个潜在的解决方案,但是只有 returns 一个句柄在整个可运行的图形被具体化之后调用,所以合并SQS 轮询源和 HTTP 可调用源。
我认为 Source.queue
确实是去这里的方式,将流具体化一次,并从您的 HTTP 服务器端点和您的 SQS 轮询器向队列提供元素。有什么特别的原因很难分享吗?
我们有以下架构
SQS(source) -> SQS Pollers -> 我们的业务逻辑 -> 从 SQS 删除消息的 Sink。
这是一个 akka 流(我们的业务逻辑有多个阶段)。
现在我们想通过添加一个 HTTP 服务器(不是 Akka HTTP)来扩展这个架构。
现在我们的服务也有了路径
HTTP 服务器 -> 我们的业务逻辑 -> 完成未来指示 HTTP 响应已完成的接收器。
现在,无论何时收到 HTTP 请求,我都需要一种机制来调用流。
现在 SQS 源本质上是一个长 运行 线程,它调用服务并将消息推送到 akka 流的其余部分。
我实际上是在尝试创建一个 "callable" akka 源,这样只有在我们收到请求时才会触发该源。
我在这里寻找 https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html 作为一个潜在的解决方案,但是只有 returns 一个句柄在整个可运行的图形被具体化之后调用,所以合并SQS 轮询源和 HTTP 可调用源。
我认为 Source.queue
确实是去这里的方式,将流具体化一次,并从您的 HTTP 服务器端点和您的 SQS 轮询器向队列提供元素。有什么特别的原因很难分享吗?