Akka Streams WebSocket 发送关于任意事件的信息

Akka Streams WebSocket to send info on arbitrary events

我想实现一个服务,其中多个客户端可以使用 WebSocket 连接到服务器。服务器应该能够在任意内部事件上向所有连接的客户端广播消息。到目前为止我有这个代码:

import akka.http.scaladsl.server.RouteResult.route2HandlerFlow
import akka.http.scaladsl.server.Directives._
implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()

// The source to broadcast (just ints for simplicity)
val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)

// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
  dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

val producer: Source[String, NotUsed] = runnableGraph.run()

// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)

val wsHandler: Flow[Message, Message, NotUsed] =
  Flow[Message]
    .mapConcat(_ => Nil) // Ignore any data sent from the client
    .merge(producer)  // Stream the data we want to the client
    .map(l => TextMessage(l.toString))

val route =
  path("ws") {
    handleWebSocketMessages(wsHandler)
  }

val port = 8080

println("Starting up route")
Http().bindAndHandle(route2HandlerFlow(route), "127.0.0.1", port)
println(s"Started HTTP server on port $port")

它成功地向连接的客户端广播当前报价。我应该如何修改此代码才能广播任意消息,而不仅仅是预定的报价?

澄清:

"arbitrary messages" 我不是指文件或数据库等其他来源,而是指能够将消息发送到专门的 Source 并将其中继到当前连接的客户端.此类消息可能是某些随时可能发生的内部系统事件的结果。

您只需更改数据源即可。

正在从 csv 文件中获取数据:

val dataSource = FileIO.fromPath(Paths.get("file.csv"))
  .via(Framing.delimiter(ByteString("\n"), 256, true)
  .map(_.utf8String))

正在从 SQS (Alpakka) 获取数据:

val dataSource = SqsSource(queue, sqsSourceSettings).take(100).map(_.getBody)

使用 Slick (Alpakka) 从 table 获取数据:

val dataSource = Slick.source(sql"SELECT NAME FROM USERS".as[String])

基本上你需要了解三件事:

  • 来源:一个输出
  • 流程:一进一出
  • 接收器:一个输入。

了解这一点,您可以构建线性管道,就像:

source.via(flow1).via(flow2).runWith(sink)

因此,您可以轻松地 "plug" 将源输入到现有管道中,然后 运行 将它们与您想要的任何接收器一起使用:

val pipeline = flow1.via(flow2)

val fileSource = FileIO.fromPath(Paths.get("file.csv"))
  .via(Framing.delimiter(ByteString("\n"), 256, true)
  .map(_.utf8String))
  .via(pipeline)
  .runWith(sink)

val sqsSource = Slick
  .source(sql"SELECT NAME FROM USERS".as[String])
  .via(pipeline)
  .runWith(sink)

val slickFlow = SqsSource(queue, sqsSourceSettings).take(100)
  .map(_.getBody)
  .via(pipeline)
  .runWith(sink)

编辑:好吧,除了 actorRef 策略,您还可以使用 Source.queue 并通过调用 queue.offer:

来生成您的消息
def source = Source
  .queue(Int.MaxValue, OverflowStrategy.backpressure)
  .map { name: String => s"hello, $name" }
  .toMat(BroadcastHub.sink[String])(Keep.both)
  .run()

def wsHandler(s: Source[String, NotUsed]): Flow[Message, Message, NotUsed] = Flow[Message]
  .mapConcat(_ => Nil)
  .merge(s)
  .map(TextMessage(_))

import scala.concurrent.duration._

val websocketRoute =
  path("greeter" / Segment) { name =>
    val (queue, s) = source

    Source
      .tick(
        initialDelay = 1 second,
        interval = 1 second,
        tick = None
      )
      .map { _ =>
        queue.offer(name)
      }
      .runWith(Sink.ignore)

    handleWebSocketMessages(wsHandler(s))
  }

外部链接:

一个想法是使用 Source.actorRef:

val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
  .toMat(BroadcastHub.sink[String])(Keep.both)
  .run()

val wsHandler: Flow[Message, Message, NotUsed] = Flow[Message]
  .mapConcat(_ => Nil)
  .merge(source)
  .map(l => TextMessage(l.toString))

如果有下游需求,发送到物化 ActorRef 的消息将被发出。如果没有下游需求,则将元素缓冲,如果缓冲区已满,则使用提供的溢出策略。请注意,此方法没有背压。您可以从 Source 发送消息以及任意消息到此参与者:

Source(1 to 1000)
  .throttle(1, 1.second, 1, ThrottleMode.Shaping)
  .map(_.toString)
  .runForeach(msg => actor ! msg)

actor ! "bacon"
actor ! "ribeye"