Akka Streams WebSocket 发送关于任意事件的信息
Akka Streams WebSocket to send info on arbitrary events
我想实现一个服务,其中多个客户端可以使用 WebSocket 连接到服务器。服务器应该能够在任意内部事件上向所有连接的客户端广播消息。到目前为止我有这个代码:
import akka.http.scaladsl.server.RouteResult.route2HandlerFlow
import akka.http.scaladsl.server.Directives._
implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()
// The source to broadcast (just ints for simplicity)
val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)
// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
val producer: Source[String, NotUsed] = runnableGraph.run()
// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)
val wsHandler: Flow[Message, Message, NotUsed] =
Flow[Message]
.mapConcat(_ => Nil) // Ignore any data sent from the client
.merge(producer) // Stream the data we want to the client
.map(l => TextMessage(l.toString))
val route =
path("ws") {
handleWebSocketMessages(wsHandler)
}
val port = 8080
println("Starting up route")
Http().bindAndHandle(route2HandlerFlow(route), "127.0.0.1", port)
println(s"Started HTTP server on port $port")
它成功地向连接的客户端广播当前报价。我应该如何修改此代码才能广播任意消息,而不仅仅是预定的报价?
澄清:
"arbitrary messages" 我不是指文件或数据库等其他来源,而是指能够将消息发送到专门的 Source
并将其中继到当前连接的客户端.此类消息可能是某些随时可能发生的内部系统事件的结果。
您只需更改数据源即可。
正在从 csv 文件中获取数据:
val dataSource = FileIO.fromPath(Paths.get("file.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true)
.map(_.utf8String))
正在从 SQS (Alpakka) 获取数据:
val dataSource = SqsSource(queue, sqsSourceSettings).take(100).map(_.getBody)
使用 Slick (Alpakka) 从 table 获取数据:
val dataSource = Slick.source(sql"SELECT NAME FROM USERS".as[String])
基本上你需要了解三件事:
- 来源:一个输出
- 流程:一进一出
- 接收器:一个输入。
了解这一点,您可以构建线性管道,就像:
source.via(flow1).via(flow2).runWith(sink)
因此,您可以轻松地 "plug" 将源输入到现有管道中,然后 运行 将它们与您想要的任何接收器一起使用:
val pipeline = flow1.via(flow2)
val fileSource = FileIO.fromPath(Paths.get("file.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true)
.map(_.utf8String))
.via(pipeline)
.runWith(sink)
val sqsSource = Slick
.source(sql"SELECT NAME FROM USERS".as[String])
.via(pipeline)
.runWith(sink)
val slickFlow = SqsSource(queue, sqsSourceSettings).take(100)
.map(_.getBody)
.via(pipeline)
.runWith(sink)
编辑:好吧,除了 actorRef 策略,您还可以使用 Source.queue 并通过调用 queue.offer:
来生成您的消息
def source = Source
.queue(Int.MaxValue, OverflowStrategy.backpressure)
.map { name: String => s"hello, $name" }
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
def wsHandler(s: Source[String, NotUsed]): Flow[Message, Message, NotUsed] = Flow[Message]
.mapConcat(_ => Nil)
.merge(s)
.map(TextMessage(_))
import scala.concurrent.duration._
val websocketRoute =
path("greeter" / Segment) { name =>
val (queue, s) = source
Source
.tick(
initialDelay = 1 second,
interval = 1 second,
tick = None
)
.map { _ =>
queue.offer(name)
}
.runWith(Sink.ignore)
handleWebSocketMessages(wsHandler(s))
}
外部链接:
一个想法是使用 Source.actorRef
:
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
val wsHandler: Flow[Message, Message, NotUsed] = Flow[Message]
.mapConcat(_ => Nil)
.merge(source)
.map(l => TextMessage(l.toString))
如果有下游需求,发送到物化 ActorRef
的消息将被发出。如果没有下游需求,则将元素缓冲,如果缓冲区已满,则使用提供的溢出策略。请注意,此方法没有背压。您可以从 Source
发送消息以及任意消息到此参与者:
Source(1 to 1000)
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.map(_.toString)
.runForeach(msg => actor ! msg)
actor ! "bacon"
actor ! "ribeye"
我想实现一个服务,其中多个客户端可以使用 WebSocket 连接到服务器。服务器应该能够在任意内部事件上向所有连接的客户端广播消息。到目前为止我有这个代码:
import akka.http.scaladsl.server.RouteResult.route2HandlerFlow
import akka.http.scaladsl.server.Directives._
implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()
// The source to broadcast (just ints for simplicity)
val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)
// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
val producer: Source[String, NotUsed] = runnableGraph.run()
// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)
val wsHandler: Flow[Message, Message, NotUsed] =
Flow[Message]
.mapConcat(_ => Nil) // Ignore any data sent from the client
.merge(producer) // Stream the data we want to the client
.map(l => TextMessage(l.toString))
val route =
path("ws") {
handleWebSocketMessages(wsHandler)
}
val port = 8080
println("Starting up route")
Http().bindAndHandle(route2HandlerFlow(route), "127.0.0.1", port)
println(s"Started HTTP server on port $port")
它成功地向连接的客户端广播当前报价。我应该如何修改此代码才能广播任意消息,而不仅仅是预定的报价?
澄清:
"arbitrary messages" 我不是指文件或数据库等其他来源,而是指能够将消息发送到专门的 Source
并将其中继到当前连接的客户端.此类消息可能是某些随时可能发生的内部系统事件的结果。
您只需更改数据源即可。
正在从 csv 文件中获取数据:
val dataSource = FileIO.fromPath(Paths.get("file.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true)
.map(_.utf8String))
正在从 SQS (Alpakka) 获取数据:
val dataSource = SqsSource(queue, sqsSourceSettings).take(100).map(_.getBody)
使用 Slick (Alpakka) 从 table 获取数据:
val dataSource = Slick.source(sql"SELECT NAME FROM USERS".as[String])
基本上你需要了解三件事:
- 来源:一个输出
- 流程:一进一出
- 接收器:一个输入。
了解这一点,您可以构建线性管道,就像:
source.via(flow1).via(flow2).runWith(sink)
因此,您可以轻松地 "plug" 将源输入到现有管道中,然后 运行 将它们与您想要的任何接收器一起使用:
val pipeline = flow1.via(flow2)
val fileSource = FileIO.fromPath(Paths.get("file.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true)
.map(_.utf8String))
.via(pipeline)
.runWith(sink)
val sqsSource = Slick
.source(sql"SELECT NAME FROM USERS".as[String])
.via(pipeline)
.runWith(sink)
val slickFlow = SqsSource(queue, sqsSourceSettings).take(100)
.map(_.getBody)
.via(pipeline)
.runWith(sink)
编辑:好吧,除了 actorRef 策略,您还可以使用 Source.queue 并通过调用 queue.offer:
来生成您的消息def source = Source
.queue(Int.MaxValue, OverflowStrategy.backpressure)
.map { name: String => s"hello, $name" }
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
def wsHandler(s: Source[String, NotUsed]): Flow[Message, Message, NotUsed] = Flow[Message]
.mapConcat(_ => Nil)
.merge(s)
.map(TextMessage(_))
import scala.concurrent.duration._
val websocketRoute =
path("greeter" / Segment) { name =>
val (queue, s) = source
Source
.tick(
initialDelay = 1 second,
interval = 1 second,
tick = None
)
.map { _ =>
queue.offer(name)
}
.runWith(Sink.ignore)
handleWebSocketMessages(wsHandler(s))
}
外部链接:
一个想法是使用 Source.actorRef
:
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
val wsHandler: Flow[Message, Message, NotUsed] = Flow[Message]
.mapConcat(_ => Nil)
.merge(source)
.map(l => TextMessage(l.toString))
如果有下游需求,发送到物化 ActorRef
的消息将被发出。如果没有下游需求,则将元素缓冲,如果缓冲区已满,则使用提供的溢出策略。请注意,此方法没有背压。您可以从 Source
发送消息以及任意消息到此参与者:
Source(1 to 1000)
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.map(_.toString)
.runForeach(msg => actor ! msg)
actor ! "bacon"
actor ! "ribeye"