使用 Akka 通过 WebSocket 流式传输分页 API 响应

Stream paginated API response over WebSocket using Akka

在服务器端,我正在使用 HTTP API,return 在页面中显示其结果。例如,响应包含 x 个结果,如果超过 0 个,我可以再次调用它以获得下一个 x 个结果。 x 可以任意选择,直到 API.

的最大页面大小

现在我想通过 WebSocket 高效地流式传输全部结果,而不会压倒它(应用背压)。最初我构建了整个结果集,然后从中创建了一个源:

getEventsFuture().foreach { events =>
  sender ! Flow.fromSinkAndSource(Sink.ignore, Source(events))
}

这有效,WebSocket 客户端以其最大速度接收所有事件。这样做的最大缺点是我必须在开始向我的客户发送 return 数据之前获取所有页面。理想情况下,我会使用较小的页面大小,并在客户端连接后立即开始 return 将结果发送给客户端,并在此过程中获取下一页。

所以我需要一个带有源的流,我可以在流具体化后向其添加数据。我尝试为此使用 Source.actorRef

val events = Source.actorRef[Event](1000, OverflowStrategy.fail).mapMaterializedValue { outActor =>
  sendEvents(outActor)
  NotUsed
}

sender ! Flow.fromSinkAndSource(Sink.ignore, events)

本质上,我采用物化的 actorRef 并将所有事件发送给它。每次获取页面时,我都会将结果转储给演员。现在,我对 Source 的初始化可能已经告诉你这并不总是有效。有时,当响应足够大并且客户端不像其他时候那样快速消耗时,套接字连接会关闭。我觉得 OverflowStrategy.fail 是反对丢弃事件的正确策略,因为我不希望客户认为他们得到了一切,如果不是这样的话。

我没有预先为缓冲区设置合理的值,我不想设置 Int.max 或其他东西,因为我认为 Akka 内部确实为缓冲区大小分配了全部内存。

我该如何解决这个问题?我希望所有事件尽可能快地发送给客户端,并像第一个示例一样具有适当的背压。

获取第一页后,我知道总共会有多少结果,因此我可以预先获取一个小页面并将缓冲区大小设置为完整结果大小,但这似乎是一种解决方法。

我发现 unfoldAsync 非常适合这个用例。

Signature

def unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed]

Description

Just like unfold but the fold function returns a Future which will cause the source to complete or emit when it completes.

emits when there is demand and unfold state returned future completes with some value

completes when the future returned by the unfold function completes with an empty value