Akka 流从外部世界获取无限流的当前值
Akka stream get current value of an infinite stream from outside world
获取无限流的当前值的最佳方法是什么,该流聚合值并且根据定义永远不会完成
Source.repeat(1)
.scan(0)(_+_)
.to(Sink.ignore)
我想从 Akka HTTP 查询当前计数器值。我应该使用动态流吗? broadcastHub 然后从 Akka http 订阅 GET 请求的无限流?
一种解决方案可能是使用 actor 来保持您需要的状态。 Sink.actorRef
会将现有的 actor ref 包装在接收器中,例如
class Keeper extends Actor {
var i: Int = 0
override def receive: Receive = {
case n: Int ⇒ i = n
case Keeper.Get ⇒ sender ! i
}
}
object Keeper {
case object Get
}
val actorRef = system.actorOf(Props(classOf[Keeper]))
val q = Source.repeat(1)
.scan(0)(_+_)
.runWith(Sink.actorRef(actorRef, PoisonPill))
val result = (actorRef ? Keeper.Get).mapTo[Int]
请注意,使用 Sink.actorRef
时不会保留背压。这可以通过使用 Sink.actorRefWithAck
来改善。有关更多信息,请参阅 docs.
一种可能是使用 Sink.actorRefWithBackpressure。
假设有以下 Actor 来存储来自 Stream 的状态:
object StremState {
case object Ack
sealed trait Protocol extends Product with Serializable
case object StreamInitialized extends Protocol
case object StreamCompleted extends Protocol
final case class WriteState[A](value: A) extends Protocol
final case class StreamFailure(ex: Throwable) extends Protocol
final case object GetState extends Protocol
}
class StremState[A](implicit A: ClassTag[A]) extends Actor with ActorLogging {
import StremState._
var state: Option[A] = None
def receive: Receive = {
case StreamInitialized =>
log.info("Stream initialized!")
sender() ! Ack // ack to allow the stream to proceed sending more elements
case StreamCompleted =>
log.info("Stream completed!")
case StreamFailure(ex) =>
log.error(ex, "Stream failed!")
case WriteState(A(value)) =>
log.info("Received element: {}", value)
state = Some(value)
sender() ! Ack // ack to allow the stream to proceed sending more elements
case GetState =>
log.info("Fetching state: {}", state)
sender() ! state
case other =>
log.warning("Unexpected message '{}'", other)
}
}
此 actor 可以在流的 Sink 中使用,如下所示:
implicit val tm: Timeout = Timeout(1.second)
val stream: Source[Int, NotUsed] = Source.repeat(1).scan(0)(_+_)
val receiver = system.actorOf(Props(new StremState[Int]))
val sink = Sink.actorRefWithBackpressure(
receiver,
onInitMessage = StremState.StreamInitialized,
ackMessage = StremState.Ack,
onCompleteMessage = StremState.StreamCompleted,
onFailureMessage = (ex: Throwable) => StremState.StreamFailure(ex)
)
stream.runWith(sink)
// Ask for Stream current state to the receiver Actor
val futureState = receiver ? GetState
获取无限流的当前值的最佳方法是什么,该流聚合值并且根据定义永远不会完成
Source.repeat(1)
.scan(0)(_+_)
.to(Sink.ignore)
我想从 Akka HTTP 查询当前计数器值。我应该使用动态流吗? broadcastHub 然后从 Akka http 订阅 GET 请求的无限流?
一种解决方案可能是使用 actor 来保持您需要的状态。 Sink.actorRef
会将现有的 actor ref 包装在接收器中,例如
class Keeper extends Actor {
var i: Int = 0
override def receive: Receive = {
case n: Int ⇒ i = n
case Keeper.Get ⇒ sender ! i
}
}
object Keeper {
case object Get
}
val actorRef = system.actorOf(Props(classOf[Keeper]))
val q = Source.repeat(1)
.scan(0)(_+_)
.runWith(Sink.actorRef(actorRef, PoisonPill))
val result = (actorRef ? Keeper.Get).mapTo[Int]
请注意,使用 Sink.actorRef
时不会保留背压。这可以通过使用 Sink.actorRefWithAck
来改善。有关更多信息,请参阅 docs.
一种可能是使用 Sink.actorRefWithBackpressure。
假设有以下 Actor 来存储来自 Stream 的状态:
object StremState {
case object Ack
sealed trait Protocol extends Product with Serializable
case object StreamInitialized extends Protocol
case object StreamCompleted extends Protocol
final case class WriteState[A](value: A) extends Protocol
final case class StreamFailure(ex: Throwable) extends Protocol
final case object GetState extends Protocol
}
class StremState[A](implicit A: ClassTag[A]) extends Actor with ActorLogging {
import StremState._
var state: Option[A] = None
def receive: Receive = {
case StreamInitialized =>
log.info("Stream initialized!")
sender() ! Ack // ack to allow the stream to proceed sending more elements
case StreamCompleted =>
log.info("Stream completed!")
case StreamFailure(ex) =>
log.error(ex, "Stream failed!")
case WriteState(A(value)) =>
log.info("Received element: {}", value)
state = Some(value)
sender() ! Ack // ack to allow the stream to proceed sending more elements
case GetState =>
log.info("Fetching state: {}", state)
sender() ! state
case other =>
log.warning("Unexpected message '{}'", other)
}
}
此 actor 可以在流的 Sink 中使用,如下所示:
implicit val tm: Timeout = Timeout(1.second)
val stream: Source[Int, NotUsed] = Source.repeat(1).scan(0)(_+_)
val receiver = system.actorOf(Props(new StremState[Int]))
val sink = Sink.actorRefWithBackpressure(
receiver,
onInitMessage = StremState.StreamInitialized,
ackMessage = StremState.Ack,
onCompleteMessage = StremState.StreamCompleted,
onFailureMessage = (ex: Throwable) => StremState.StreamFailure(ex)
)
stream.runWith(sink)
// Ask for Stream current state to the receiver Actor
val futureState = receiver ? GetState