Akka Streams 和 Scala Play 服务器

Akka Streams and Scala Play server

我有一个用 scala play 2.6 编写的服务器

我正在尝试使用 websocket

  1. 收到客户的请求
  2. 处理该请求
  3. 将结果广播给所有客户端,如果正确,则仅将错误广播给 发送请求的客户端 if Left

我现在正在向所有客户端广播消息,有谁知道在错误情况下如何只回复发件人?

  val processFlow = Flow[String].map(process).map(_.toString)

  val (sink, source) = { 
    MergeHub.source[String]
      .via(processFlow)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()
  }

  val websocketFlow = Flow.fromSinkAndSource(sink, source)

  def ws = WebSocket.accept[String, String] { request =>  
    websocketFlow
  }

  def process(message: String): Either[String, String] = { 
    if (message == "error") { // replace with any error condition
      Left ("ERROR " ++ message)
    } else {
      Right (message ++ " processed")
    }   
  }

如果您在流中跟踪发件人,则可以在将收到的消息发送到 websocket 之前对其进行过滤:

case class ProcessResult(senderId: String, result: Either[String, String])

val (sink, source) = { 
  MergeHub.source[ProcessResult]
    .toMat(BroadcastHub.sink[ProcessResult])(Keep.both)
    .run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)

def ws = WebSocket.accept[String, String] { request =>
  // create a random id to identify the sender
  val senderId = UUID.randomUUID().toString
  Flow[String]
    .map(process)
    .map(result => ProcessResult(senderId, result))
    // broadcast the result to the other websockets
    .via(websocketFlow)
    // filter the results to only keep the errors for the sender
    .collect {
      case ProcessResult(sender, Left(error)) if sender == senderId => List(error)
      case ProcessResult(_, Left(error)) => List.empty
      case ProcessResult(_, Right(result)) => List(result)
    }.mapConcat(identity)
}

def process(message: String): Either[String, String] = { 
  if (message == "error") { // replace with any error condition
    Left ("ERROR " ++ message)
  } else {
    Right (message ++ " processed")
  }   
}