Akka persistance custom TCK plugin ,如何支持 Tagged 类型的非 java 序列化?

Akka persistance custom TCK plugin , How to support non java serialization for Tagged type?

我正在使用 Apache ignite 构建 Akka 持久性插件,我对事件标记有疑问,如果我禁用 java 序列化(允许-java-序列化 = no),已标记type 无法正确序列化,因为我正在为事件使用 Protobuf,有没有办法为 Event Tagged wrapper 配置特定的序列化,或者它需要在插件本身中处理? 我的插件的GitHub:https://github.com/Romeh/akka-persistance-ignite

我得到的异常是:

[2018-11-21 21:20:48] [orderManagerSystem-akka.actor.default-dispatcher-27] 错误 a.p.i.journal.IgniteWriteJournal - 尝试使用 Java 序列化序列化消息,同时 akka.actor.allow-java-serialization 被禁用。检查警告日志以获取更多详细信息。 akka.serialization.DisabledJavaSerializer$JavaSerializationException:尝试在禁用 akka.actor.allow-java-serialization 时使用 Java 序列化来序列化消息。检查警告日志以获取更多详细信息。 [2018-11-21 21:20:48] [orderManagerSystem-akka.actor.default-dispatcher-11] 警告 a.s.DisabledJavaSerializer - 传出消息试图使用 Java 序列化,即使 akka.actor.allow-java-serialization = off定了!消息类型为:[class akka.persistence.journal.Tagged]

我有一个示例应用程序,我在使用 Protobuf 进行事件序列化时遇到了这个问题,我是否需要对事件标记类型执行相同的操作?

GitHub URL 示例代码: https://github.com/Romeh/spring-boot-akka-event-sourcing-starter/tree/master/spring-event-sourcing-example

在 Apache ignite 中,它是二进制序列化。

非常感谢您的帮助!

从您的工作流程图来看,Akka 持久性是第一位的,因此您必须说服它按原样将不可序列化的对象传递给 Apache Ignite。

我想你也可以

  • 为 Protobuf 指定一个 akka.serialization.Serializer 实现,它将把 Protobuf 变成 Array[Byte]
  • 指定一个 akka.serialization.Serializer 实现,它将利用 Apache Ignite 的 BinaryMarshaller.marshal(Object)。请注意,Apache Ignite 不会意识到 BinaryObject 正在其缓存中存储,而是将其视为 byte[],除非您添加某种额外的处理。

实际上,在将实际事件存储在日志中之前,通过检查消息类型是否为标记事件已解决此问题,例如:

private JournalItem convert(PersistentRepr p) {
    if (p.payload() instanceof Tagged) {
        Tagged taggedMsg = (Tagged) p.payload();
        PersistentRepr persistentReprWithoutTag = new PersistentImpl(taggedMsg.payload(), p.sequenceNr(), p.persistenceId(), p.manifest(), p.deleted(), p.sender(), p.writerUuid());
        return new JournalItem(persistentReprWithoutTag.sequenceNr(), persistentReprWithoutTag.persistenceId(), serializer.toBinary(persistentReprWithoutTag), JavaConverters.asJavaCollection(taggedMsg.tags()));
    } else {
        return new JournalItem(p.sequenceNr(), p.persistenceId(), serializer.toBinary(p), null);
    }

}