如何在 Actor 中停止 Source.tick?
How to stop Source.tick in the Actor?
我有一个 Actor,它每 2 秒产生一个 NotUsed
。也许它没有任何意义,但它仅用于测试目的。
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.sweetsoft.FsmSystem.{Add, StartTicker, StopTicker}
import scala.concurrent.duration._
object AddActor {
def props: Props = Props(new AddActor)
}
final class AddActor extends Actor with ActorLogging {
implicit val materilizer = ActorMaterializer()
private val consumer: NotUsed => Unit = _ =>
context.parent ! Add
private val runnable = Source.tick(2.second, 2.second, NotUsed)
.named("Ticker")
.toMat(Sink.foreach(consumer))(Keep.both)
override def receive: Receive = {
case StartTicker =>
runnable.run()
case StopTicker =>
}
}
当 Actor 收到消息 StopTicker
时,我想停止流。然后按StartTicker
,开始直播。
调用run()
方法,我会得到物化值Cancellabel
,但在StopTicker
范围内无法访问。
我该怎么办?
您可以使用 becode/unbecome 模式:
final class AddActor extends Actor with ActorLogging {
import context._
implicit val materilizer: ActorMaterializer = ActorMaterializer()
private val consumer: NotUsed => Unit = _ =>
self ! Add //send Add to self rather than directly to context.parent
private val runnable = Source.tick(2.second, 2.second, NotUsed)
.named("Ticker")
.toMat(Sink.foreach(consumer))(Keep.both)
override def preStart(): Unit = { //start runnable on actor start
super.preStart()
runnable.run()
}
val running: Receive = { //when running ignore new StartTicker but handle StopTicker and Add
case StopTicker =>
become(paused)
case Add =>
context.parent ! Add
}
val paused: Receive = { //handle just StartTicker
case StartTicker =>
become(running)
}
override def receive: Receive = paused //initial receive is paused
}
我有一个 Actor,它每 2 秒产生一个 NotUsed
。也许它没有任何意义,但它仅用于测试目的。
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.sweetsoft.FsmSystem.{Add, StartTicker, StopTicker}
import scala.concurrent.duration._
object AddActor {
def props: Props = Props(new AddActor)
}
final class AddActor extends Actor with ActorLogging {
implicit val materilizer = ActorMaterializer()
private val consumer: NotUsed => Unit = _ =>
context.parent ! Add
private val runnable = Source.tick(2.second, 2.second, NotUsed)
.named("Ticker")
.toMat(Sink.foreach(consumer))(Keep.both)
override def receive: Receive = {
case StartTicker =>
runnable.run()
case StopTicker =>
}
}
当 Actor 收到消息 StopTicker
时,我想停止流。然后按StartTicker
,开始直播。
调用run()
方法,我会得到物化值Cancellabel
,但在StopTicker
范围内无法访问。
我该怎么办?
您可以使用 becode/unbecome 模式:
final class AddActor extends Actor with ActorLogging {
import context._
implicit val materilizer: ActorMaterializer = ActorMaterializer()
private val consumer: NotUsed => Unit = _ =>
self ! Add //send Add to self rather than directly to context.parent
private val runnable = Source.tick(2.second, 2.second, NotUsed)
.named("Ticker")
.toMat(Sink.foreach(consumer))(Keep.both)
override def preStart(): Unit = { //start runnable on actor start
super.preStart()
runnable.run()
}
val running: Receive = { //when running ignore new StartTicker but handle StopTicker and Add
case StopTicker =>
become(paused)
case Add =>
context.parent ! Add
}
val paused: Receive = { //handle just StartTicker
case StartTicker =>
become(running)
}
override def receive: Receive = paused //initial receive is paused
}