Akka Streams 和 Scala Play 服务器
Akka Streams and Scala Play server
我有一个用 scala play 2.6 编写的服务器
我正在尝试使用 websocket
- 收到客户的请求
- 处理该请求
- 将结果广播给所有客户端,如果正确,则仅将错误广播给
发送请求的客户端 if Left
我现在正在向所有客户端广播消息,有谁知道在错误情况下如何只回复发件人?
val processFlow = Flow[String].map(process).map(_.toString)
val (sink, source) = {
MergeHub.source[String]
.via(processFlow)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)
def ws = WebSocket.accept[String, String] { request =>
websocketFlow
}
def process(message: String): Either[String, String] = {
if (message == "error") { // replace with any error condition
Left ("ERROR " ++ message)
} else {
Right (message ++ " processed")
}
}
如果您在流中跟踪发件人,则可以在将收到的消息发送到 websocket 之前对其进行过滤:
case class ProcessResult(senderId: String, result: Either[String, String])
val (sink, source) = {
MergeHub.source[ProcessResult]
.toMat(BroadcastHub.sink[ProcessResult])(Keep.both)
.run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)
def ws = WebSocket.accept[String, String] { request =>
// create a random id to identify the sender
val senderId = UUID.randomUUID().toString
Flow[String]
.map(process)
.map(result => ProcessResult(senderId, result))
// broadcast the result to the other websockets
.via(websocketFlow)
// filter the results to only keep the errors for the sender
.collect {
case ProcessResult(sender, Left(error)) if sender == senderId => List(error)
case ProcessResult(_, Left(error)) => List.empty
case ProcessResult(_, Right(result)) => List(result)
}.mapConcat(identity)
}
def process(message: String): Either[String, String] = {
if (message == "error") { // replace with any error condition
Left ("ERROR " ++ message)
} else {
Right (message ++ " processed")
}
}
我有一个用 scala play 2.6 编写的服务器
我正在尝试使用 websocket
- 收到客户的请求
- 处理该请求
- 将结果广播给所有客户端,如果正确,则仅将错误广播给 发送请求的客户端 if Left
我现在正在向所有客户端广播消息,有谁知道在错误情况下如何只回复发件人?
val processFlow = Flow[String].map(process).map(_.toString)
val (sink, source) = {
MergeHub.source[String]
.via(processFlow)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)
def ws = WebSocket.accept[String, String] { request =>
websocketFlow
}
def process(message: String): Either[String, String] = {
if (message == "error") { // replace with any error condition
Left ("ERROR " ++ message)
} else {
Right (message ++ " processed")
}
}
如果您在流中跟踪发件人,则可以在将收到的消息发送到 websocket 之前对其进行过滤:
case class ProcessResult(senderId: String, result: Either[String, String])
val (sink, source) = {
MergeHub.source[ProcessResult]
.toMat(BroadcastHub.sink[ProcessResult])(Keep.both)
.run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)
def ws = WebSocket.accept[String, String] { request =>
// create a random id to identify the sender
val senderId = UUID.randomUUID().toString
Flow[String]
.map(process)
.map(result => ProcessResult(senderId, result))
// broadcast the result to the other websockets
.via(websocketFlow)
// filter the results to only keep the errors for the sender
.collect {
case ProcessResult(sender, Left(error)) if sender == senderId => List(error)
case ProcessResult(_, Left(error)) => List.empty
case ProcessResult(_, Right(result)) => List(result)
}.mapConcat(identity)
}
def process(message: String): Either[String, String] = {
if (message == "error") { // replace with any error condition
Left ("ERROR " ++ message)
} else {
Right (message ++ " processed")
}
}