Akka 流从外部世界获取无限流的当前值

Akka stream get current value of an infinite stream from outside world

获取无限流的当前值的最佳方法是什么,该流聚合值并且根据定义永远不会完成

Source.repeat(1)
  .scan(0)(_+_)
  .to(Sink.ignore)

我想从 Akka HTTP 查询当前计数器值。我应该使用动态流吗? broadcastHub 然后从 Akka http 订阅 GET 请求的无限流?

一种解决方案可能是使用 actor 来保持您需要的状态。 Sink.actorRef 会将现有的 actor ref 包装在接收器中,例如

class Keeper extends Actor {
  var i: Int = 0

  override def receive: Receive = {
    case n: Int ⇒ i = n
    case Keeper.Get ⇒ sender ! i
  }
}

object Keeper {
  case object Get
}

val actorRef = system.actorOf(Props(classOf[Keeper]))

val q = Source.repeat(1)
  .scan(0)(_+_)
  .runWith(Sink.actorRef(actorRef, PoisonPill))

val result = (actorRef ? Keeper.Get).mapTo[Int]

请注意,使用 Sink.actorRef 时不会保留背压。这可以通过使用 Sink.actorRefWithAck 来改善。有关更多信息,请参阅 docs.

一种可能是使用 Sink.actorRefWithBackpressure

假设有以下 Actor 来存储来自 Stream 的状态:

object StremState {
  case object Ack
  sealed trait Protocol                         extends Product with Serializable
  case object StreamInitialized                 extends Protocol
  case object StreamCompleted                   extends Protocol
  final case class WriteState[A](value: A)      extends Protocol
  final case class StreamFailure(ex: Throwable) extends Protocol
  final case object GetState                    extends Protocol
}

class StremState[A](implicit A: ClassTag[A]) extends Actor with ActorLogging {
  import StremState._

  var state: Option[A] = None

  def receive: Receive = {
    case StreamInitialized =>
      log.info("Stream initialized!")
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case StreamCompleted =>
      log.info("Stream completed!")

    case StreamFailure(ex) =>
      log.error(ex, "Stream failed!")

    case WriteState(A(value)) =>
      log.info("Received element: {}", value)
      state = Some(value)
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case GetState =>
      log.info("Fetching state: {}", state)
      sender() ! state

    case other =>
      log.warning("Unexpected message '{}'", other)

  }
}

此 actor 可以在流的 Sink 中使用,如下所示:

    implicit val tm: Timeout           = Timeout(1.second)
    val stream: Source[Int, NotUsed]   = Source.repeat(1).scan(0)(_+_)

    val receiver = system.actorOf(Props(new StremState[Int]))
    val sink = Sink.actorRefWithBackpressure(
      receiver,
      onInitMessage = StremState.StreamInitialized,
      ackMessage = StremState.Ack,
      onCompleteMessage = StremState.StreamCompleted,
      onFailureMessage = (ex: Throwable) => StremState.StreamFailure(ex)
    )

    stream.runWith(sink)
    // Ask for Stream current state to the receiver Actor
    val futureState = receiver ? GetState