如何在播放框架控制器中取消akka流

How cancel akka stream within play framework conroller

我有一个流,应该从 http api(开始、停止,只有一个实例)控制。应将响应流式传输到客户端。此处使用播放框架控制器的代码:

  class Processor{

    def job(): Source[Int, NotUsed] ={
      stop()
      Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
    }

    def stop(): Unit ={
      //TODO
    }
  }

  class MyController(process: Processor) {

    def startJob = Action {
      val source = process.job()
      Ok.chunked(source)
    }

    def cancell = Action {
      process.cancel()
      Ok("canceled")
    }
  }

我需要取消工作的能力。当客户端关闭连接时,作业不应该取消——就像日志输出一样。我读到 KillSwitches,但不明白如何将它与接受 Source 的播放控制器一起使用。有帮助吗?

我想我需要一些不同于作业源的输出源。

我用 Monix Observable 来完成我的任务。通过操作,我可以 运行、取消并连接到 运行ning 流。无论如何,我对出于教育目的的 akka-stream 解决方案感兴趣。这里monix解决方案:

class StreamService(implicit ec: Scheduler) {

  private val runningStream: AtomicAny[Option[RunningStream]] = AtomicAny(None)

  def run(): Option[Source[ByteString, NotUsed]] =
    runningStream.get match {
      case None =>
        val observable = Observable
          .interval(1.seconds)
          .map(_.toString)
          .doOnTerminate(cb => runningStream.set(None))
          .doOnSubscriptionCancel(() => runningStream.set(None))
          .publish

        val cancelable_ = observable.connect()

        this.runningStream.set(Some(RunningStream(cancelable_, observable)))
        connect()
      case _ => None
    }

  def connect(): Option[Source[ByteString, NotUsed]] =
    runningStream.get
      .map(rs => rs.observable.toReactivePublisher)
      .map(publisher => Source.fromPublisher(publisher).map(ByteString(_)))

  def cancel(): Unit =
    runningStream.get.foreach(_.cancelable.cancel())

}

object StreamService {
  case class RunningStream(cancelable: Cancelable, observable: ConnectableObservable[String])
}


class SomeController @Inject()(streamService: StreamService, cc: ControllerComponents)
  extends AbstractController(cc) {

  def run() = Action {
    val source = streamService.run().getOrElse(throw new RuntimeException("Stream already running"))
    Ok.chunked(source)
  }

  def connect() = Action {
    val source = streamService.connect().getOrElse(throw new RuntimeException("Stream not running"))
    Ok.chunked(source)
  }

  def cancel() = Action {
    streamService.cancel()
    Ok("ok")
  }
}