如何从 Akka 中的 EventStream 创建服务器端事件?

How can I create Server-side events from an EventStream in Akka?

我试过:EventStream -> Source -> Akka HTTP (SSE)

在我看来,这行不通,因为源将由 Akka HTTP complete(Source, ...) 并将消息从 EventStream 发送到物化源,我需要 ActorRef(有没有办法获取 ActorRef?)


我在 GitHub 上找到了一个使用 ActorPublisher 的解决方案: https://github.com/calvinlfer/Akka-HTTP-Akka-Streams-Akka-Actors-Integration

但由于 ActorPublisher 是一个内部 API,我仍然希望有一个干净的解决方案。

您可以按如下方式使用 Source.actorRef to create a Source that converts event stream elements to ServerSentEvent instances, and BroadcastHub.sink

val (sseActor, sseSource) =
  Source.actorRef[EventStreamMessageOrWhatever](10, akka.stream.OverflowStrategy.dropTail)
    .map(s => /* convert event stream elements to ServerSideEvent */)
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
    .run()

如果有下游需求,则发送到物化 ActorRef 的消息(即事件流元素)会向下游发出。如果没有下游需求,则按照指定的溢出策略将消息缓存到一定数量(本例中缓存大小为10)。

然后您可以将物化演员订阅到 EventStream:

eventStream.subscribe(sseActor, ...)

并且实体化 Source 可用于您的路径:

path("sse") {
  get {
    complete(sseSource)
  }
}

请注意,此方法没有背压。