将 SSE 与 Redis pub/sub 和 Akka Streams 一起使用的最简单方法是什么?

What's the simplest way to use SSE with Redis pub/sub and Akka Streams?

我想为以下场景流式传输分块服务器发送的事件:

订阅 Redis 密钥,如果密钥发生变化,则使用 Akka Streams 流式传输新值。它应该只在有新值时流式传输。

据我了解,我需要一个Source。我猜这是对频道的订阅:

redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) => println(
    message.readAs[String]()
  )
  case PubSubMessage.Subscribe(channel, subscribedChannelsCount) => println(
    s"Successfully subscribed to $channel"
  )
}

在我的路线中,我需要从中创建一个 Source,但老实说,我不知道如何开始:

val route: Route =
  path("stream") {
   get {
     complete {
       val source: Source[ServerSentEvent, NotUsed] =
         Source
          .asSubscriber(??) // or fromPublisher???
      .map(_ => {
        ??
      })
      .map(toServerSentEvent)
      .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      .log("stream")
     }
   }

一种方法是使用 Source.actorRef and BroadcastHub.sink:

val (sseActor, sseSource) =
  Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
    .map(toServerSentEvent) // converts a String to a ServerSentEvent
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
    .run()

将具体化的 ActorRef 订阅到您的消息通道:发送到此 actor 的消息将向下游发出。如果没有下游需求,则按照指定的溢出策略将消息缓存到一定数量(本例中缓存大小为10)。请注意,这种方法没有背压。

redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) =>
    val strMsg = message.readAs[String]
    println(strMsg)
    sseActor ! strMsg

  case ...
}

另请注意,上面的示例使用了 Source.actorRef[String];根据需要调整类型和示例(例如,它可以是 Source.actorRef[PubSubMessage.Message])。

并且您可以在您的路径中使用实体化 Source

path("stream") {
  get {
    complete(sseSource)
  }
}

另一种方法是创建一个 Source 作为队列,并将元素提供给订阅者回调中收到的队列

val queue =
  Source
    .queue[String](10, OverflowStrategy.dropHead) // drops the oldest element from the buffer to make space for the new element.
    .map(toServerSentEvent) // converts a String to a ServerSentEvent
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .to(Sink.ignore)
    .run()

并在订阅者中

    redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) =>
    val strMsg = message.readAs[String]
    println(strMsg)
    queue.offer(strMsg)

  case ...
}