PersistentActor 无法在 Future onComplete 中调用持久处理程序
PersistentActor cannot invoke persist handler in Future onComplete
我是使用 PersistentActor 的新手,
当我尝试从未来的 onComplete 调用 updateState 时,失败,没有任何反应,尝试调试它并且我确实进入了持久调用但没有进入 updateState
trait Event
case class Cmd(data: String)
case class Evt(data: String) extends Event
class BarActor extends PersistentActor{
implicit val system = context.system
implicit val executionContext = system.dispatcher
def updateState(event: Evt): Unit ={
println("Updating state")
state = state.updated(event)
sender() ! state
}
def timeout(implicit ec: ExecutionContext) =
akka.pattern.after(duration = 2 seconds, using = system.scheduler)(Future.failed(new TimeoutException("Got timed out!")))
val receiveCommand: Receive = {
case Cmd(data) =>
def anotherFuture(i: Int)(implicit system: ActorSystem) = {
val realF = Future {
i % 2 match {
case 0 =>
Thread.sleep(100)
case _ =>
Thread.sleep(500)
}
i
}
Future.firstCompletedOf(Seq(realF, timeout))
.recover {
case _ => -1
}
}
val res = (1 to 10).map(anotherFuture(_))
val list = Future.sequence(res)
list.onComplete{
case _ =>
persist(Evt("testing"))(updateState)
}
}
}
你可以试试这个:
list.onComplete {
case _ => self ! Evt("testing")
}
并将其添加到 receiveCommand
case evt: Evt =>
persist(Evt("testing"))(updateStates)
我是使用 PersistentActor 的新手, 当我尝试从未来的 onComplete 调用 updateState 时,失败,没有任何反应,尝试调试它并且我确实进入了持久调用但没有进入 updateState
trait Event
case class Cmd(data: String)
case class Evt(data: String) extends Event
class BarActor extends PersistentActor{
implicit val system = context.system
implicit val executionContext = system.dispatcher
def updateState(event: Evt): Unit ={
println("Updating state")
state = state.updated(event)
sender() ! state
}
def timeout(implicit ec: ExecutionContext) =
akka.pattern.after(duration = 2 seconds, using = system.scheduler)(Future.failed(new TimeoutException("Got timed out!")))
val receiveCommand: Receive = {
case Cmd(data) =>
def anotherFuture(i: Int)(implicit system: ActorSystem) = {
val realF = Future {
i % 2 match {
case 0 =>
Thread.sleep(100)
case _ =>
Thread.sleep(500)
}
i
}
Future.firstCompletedOf(Seq(realF, timeout))
.recover {
case _ => -1
}
}
val res = (1 to 10).map(anotherFuture(_))
val list = Future.sequence(res)
list.onComplete{
case _ =>
persist(Evt("testing"))(updateState)
}
}
}
你可以试试这个:
list.onComplete {
case _ => self ! Evt("testing")
}
并将其添加到 receiveCommand
case evt: Evt =>
persist(Evt("testing"))(updateStates)