Akka persistence receiveRecover 接收来自其他 actor 实例的快照
Akka persistence receiveRecover receives snapshots that are from other actor instances
我在使用 Akka 持久性时遇到意外行为。我是 Akka 的新手,所以如果我错过了一些明显的东西,请提前致歉。
我有一个名为 PCNProcessor 的演员。我为我拥有的每个 PCN id 创建一个 actor 实例。我遇到的问题是,当我创建第一个 actor 实例时,一切正常并且我收到 Processed 响应。但是,当我使用不同的 PCN id 创建更多 PCNProcessor 实例时,我得到 Already processed PCN 响应。
本质上,出于某种原因,作为第一个 PCN id 处理器的一部分存储的快照被重新应用于后续的 PCN id 实例,即使它与该 PCN 无关并且 PCN id 不同。为了确认这种行为,我在 receiveRecover 中打印了一个日志,每个后续的 PCNProcessor 实例都会收到不属于它的快照。
我的问题是:
- 我是否应该以特定方式存储快照,以便根据 PCN Id 键入它们?然后我应该过滤掉上下文中与 PCN 无关的快照吗?
- 或者 Akka 框架应该在幕后处理这个问题,我不必担心根据 PCN id 存储快照。
演员的源代码如下。我确实使用分片。
package com.abc.pcn.core.actors
import java.util.UUID
import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import com.abc.common.AutoPassivation
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt}
object PCNProcessor {
import akka.contrib.pattern.ShardRegion
import com.abc.pcn.core.PCN
val shardName = "pcn"
val idExtractor: ShardRegion.IdExtractor = {
case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn))
}
val shardResolver: ShardRegion.ShardResolver = {
case ProcessPCN(pcn) => pcn.id.toString
}
// shard settings
def props = Props(classOf[PCNProcessor])
// command and response
case class ProcessPCN(pcn: PCN)
case class NotProcessed(reason: String)
case object Processed
}
class PCNProcessor
extends PersistentActor
with AtLeastOnceDelivery
with AutoPassivation
with ActorLogging {
import com.abc.pcn.core.actors.PCNProcessor._
import scala.concurrent.duration._
context.setReceiveTimeout(10.seconds)
private val pcnId = UUID.fromString(self.path.name)
private var state: String = "not started"
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
override def receiveRecover: Receive = {
case SnapshotOffer(_, s: String) =>
log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s)
state = s
}
def receiveCommand: Receive = withPassivation {
case ProcessPCN(pcn)
if state == "processed" =>
sender ! Left(NotProcessed("Already processed PCN"))
case ProcessPCN(pcn)
if pcn.name.isEmpty =>
val error: String = "Name is invalid"
persist(PCNNotProcessedEvt(pcn.id, error)) { evt =>
state = "invalid"
saveSnapshot(state)
sender ! Left(NotProcessed(error))
}
case ProcessPCN(pcn) =>
persist(PCNProcessedEvt(pcn.id)) { evt =>
state = "processed"
saveSnapshot(state)
sender ! Right(Processed)
}
}
}
更新:
注销接收到的快照的元数据后,我可以看到问题是 snapshotterId 没有正确解析并且总是被设置为 pcn-processor-${pcnId.toString } 没有解析斜体的位。
[INFO] [06/06/2015 09:10:00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp://ECP@127.0.0.1: 2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5]正在恢复。 PCN ID:16b3d4dd-9e0b-45de-8e32-de799d21e7c5,快照元数据SnapshotMetadata(pcn-processor-${pcnId.toString},1,1433577553585)
好的,通过将持久性 ID 更改为以下行来解决此问题:
override def persistenceId: String = "pcn-processor-" + pcnId.toString
原字符串版本:
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
仅适用于持久化日志,不适用于快照。
我认为您误用了 Scala 字符串插值功能。
按以下方式尝试:
override def persistenceId: String = s"pcn-processor-${pcnId.toString}"
请注意在字符串文字前使用 s
。
我在使用 Akka 持久性时遇到意外行为。我是 Akka 的新手,所以如果我错过了一些明显的东西,请提前致歉。
我有一个名为 PCNProcessor 的演员。我为我拥有的每个 PCN id 创建一个 actor 实例。我遇到的问题是,当我创建第一个 actor 实例时,一切正常并且我收到 Processed 响应。但是,当我使用不同的 PCN id 创建更多 PCNProcessor 实例时,我得到 Already processed PCN 响应。
本质上,出于某种原因,作为第一个 PCN id 处理器的一部分存储的快照被重新应用于后续的 PCN id 实例,即使它与该 PCN 无关并且 PCN id 不同。为了确认这种行为,我在 receiveRecover 中打印了一个日志,每个后续的 PCNProcessor 实例都会收到不属于它的快照。
我的问题是:
- 我是否应该以特定方式存储快照,以便根据 PCN Id 键入它们?然后我应该过滤掉上下文中与 PCN 无关的快照吗?
- 或者 Akka 框架应该在幕后处理这个问题,我不必担心根据 PCN id 存储快照。
演员的源代码如下。我确实使用分片。
package com.abc.pcn.core.actors
import java.util.UUID
import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import com.abc.common.AutoPassivation
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt}
object PCNProcessor {
import akka.contrib.pattern.ShardRegion
import com.abc.pcn.core.PCN
val shardName = "pcn"
val idExtractor: ShardRegion.IdExtractor = {
case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn))
}
val shardResolver: ShardRegion.ShardResolver = {
case ProcessPCN(pcn) => pcn.id.toString
}
// shard settings
def props = Props(classOf[PCNProcessor])
// command and response
case class ProcessPCN(pcn: PCN)
case class NotProcessed(reason: String)
case object Processed
}
class PCNProcessor
extends PersistentActor
with AtLeastOnceDelivery
with AutoPassivation
with ActorLogging {
import com.abc.pcn.core.actors.PCNProcessor._
import scala.concurrent.duration._
context.setReceiveTimeout(10.seconds)
private val pcnId = UUID.fromString(self.path.name)
private var state: String = "not started"
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
override def receiveRecover: Receive = {
case SnapshotOffer(_, s: String) =>
log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s)
state = s
}
def receiveCommand: Receive = withPassivation {
case ProcessPCN(pcn)
if state == "processed" =>
sender ! Left(NotProcessed("Already processed PCN"))
case ProcessPCN(pcn)
if pcn.name.isEmpty =>
val error: String = "Name is invalid"
persist(PCNNotProcessedEvt(pcn.id, error)) { evt =>
state = "invalid"
saveSnapshot(state)
sender ! Left(NotProcessed(error))
}
case ProcessPCN(pcn) =>
persist(PCNProcessedEvt(pcn.id)) { evt =>
state = "processed"
saveSnapshot(state)
sender ! Right(Processed)
}
}
}
更新:
注销接收到的快照的元数据后,我可以看到问题是 snapshotterId 没有正确解析并且总是被设置为 pcn-processor-${pcnId.toString } 没有解析斜体的位。
[INFO] [06/06/2015 09:10:00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp://ECP@127.0.0.1: 2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5]正在恢复。 PCN ID:16b3d4dd-9e0b-45de-8e32-de799d21e7c5,快照元数据SnapshotMetadata(pcn-processor-${pcnId.toString},1,1433577553585)
好的,通过将持久性 ID 更改为以下行来解决此问题:
override def persistenceId: String = "pcn-processor-" + pcnId.toString
原字符串版本:
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
仅适用于持久化日志,不适用于快照。
我认为您误用了 Scala 字符串插值功能。
按以下方式尝试:
override def persistenceId: String = s"pcn-processor-${pcnId.toString}"
请注意在字符串文字前使用 s
。