Akka.Net 发布和订阅领域事件
Akka.Net Publishing and Subscribing Domain Events
我开始研究 Akka.Net 框架与训练营。
我可以理解基本的 Actor 概念和使用事件溯源的持久性。
我无法理解领域事件将如何被其他参与者发送和接收。
限制为单个系统本地部署的 Actors 和无 DI 容器和
使用 c# /ASP.NET API 我将每个 AgreegateRoot 分离到它自己的项目中
我正在构思类似
的东西
演员经理
- -聚合根
- --ChildActor 1
- --ChildActor 2
- --ChildActor n
- -ValidationActor
Manager Actor 将接收命令消息并通过验证过程,如果通过验证将发送到 AggregateRoot Actor。事件将在根或子 actor 中生成。
请指教:
要从实体内部在类似于事件总线的东西上发布事件,我可以使用以下语法吗?
Context.System.EventStream.Publish(MyEvent);
订阅事件 我理解语法是
System.EventStream.Subscribe( subscriber,MyEvent)
我希望一个 Actor 发布的事件由 Handlers(其他 Actor)处理,其中当前的 AggregateRoot Actor 或里面的子实体应该不知道。
这就是我完全卡住的地方。这是如何实现的?
System.EventStream.Subscribe 中的订阅者是 IActorRef。为此,我需要了解 class.
我应该创建一个启动引导程序,它将引用所有项目/AggregateRoots 并在那里构建对消息类型的订阅吗?
我曾尝试查找博客或文章,但运气不佳。
提前致谢。
I am separating each AgreegateRoot into its own project
我不会为每个 AggregateRoot 都创建一个单独的项目,这对我来说似乎有点过分了。你从中得到什么?您可以只使用一个 class / actor,不需要一个完全独立的项目。
听起来您可能对如何引用表示您的事件的 C# 类型感到困惑,如果每个 AggregateRoot 都有一个单独的项目,这并不奇怪 - 您很快就会 运行 进入循环参考。尝试从单个项目开始,在该项目中使用文件夹分隔有界上下文。在每个文件夹中,创建您需要的任何聚合,以及它们负责的任何事件。这样,所有参与者都可以看到所有事件类型。一旦增长/变得难以管理,您可以考虑将其拆分为单独的项目,大致如下:
- MyApp.BoundedContext1
- MyApp.BoundedContext1.Events
- MyApp.BoundedContext2
- MyApp.BoundedContext2.Events
请注意,您的参与者产生和订阅的事件代表整个系统中的一种 public 合同/API。像上面那样将它们作为单独的 DLL 可以避免循环引用(因为 .Events 项目不引用任何东西)。所以在这个结构中,MyApp.BoundedContext1
可以引用 MyApp.BoundedContext1.Events
并发布它们。 MyApp.BoundedContext2
也可以参考MyApp.BoundedContext1.Events
并订阅。
I want the event published by an Actor to be handled by Handlers (other Actors), of which current AggregateRoot Actor or Child Entity inside should have no knowledge.
This is where I am totally stuck. How is this achieved?
您的发布者不需要订阅者的知识。发布者只是向 EventStream 发布一条消息。演员和事件类型定义的上下文应该相同(即发布者应该 'own' 他们的事件类型)。例如,如果您有一个发布 ThingValidated
事件的 ValidationActor
,它们应该都在相同的上下文中。
subscriber in System.EventStream.Subscribe, is IActorRef. To get this I would need knowledge of the class.
哪个class?订户已经知道自己。您可以只使用 Self
来获得 IActorRef
。如果您指的是 事件 class,请参阅上面有关如何构建项目以引用此事件的信息。
创建 actor 后,它可以注册它感兴趣的任何事件类型 - 如果需要,您可以将此代码放入 actor 本身的初始化代码中,或者如果您有 singleton-style 个 actor ,如果您愿意,可以使用某种引导程序函数。
另一种解决方案是使用 Akka.Cluster.Sharding,这非常适合此用例。然而;我承认集群分片不是初学者的话题。集群分片允许您设置如何通过 props 创建 actor 以及将消息映射到特定 "Entity" 的策略。
这是来自博客 post 我已链接到上面的示例中的演员的集群分片设置示例:
using(var system = ActorSystem.Create("cluster-system"))
{
var sharding = ClusterSharding.Get(system);
var shardRegion = sharding.Start(
typeName: nameof(MyActor),
entityProps: Props.Create<MyActor>(), // the Props used to create entities
settings: ClusterShardingSettings.Create(system),
messageExtractor: new MessageExtractor(maxNumberOfNodes * 10)
);
// ... etc
}
然后您可以像这样发送消息:
region.Tell(new ShardEnvelope("<entity-id>", new MyMessage()));
消息提取器
上面提到的消息提取器是实现这一切的关键组件。消息提取器允许您将消息映射到特定实体(实际上是 entityId)。因此,如果您的消息都包含目标实体的 ID,那么这就变得简单了。这是一个示例消息提取器(再次来自 petabridge 博客 post 和 Akka.Net 代码库):
public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards) { }
public override string EntityId(object message) =>
(message as ShardEnvelope)?.EntityId;
public override object EntityMessage(object message) =>
(message as ShardEnvelope)?.Payload;
}
public sealed class ShardEnvelope
{
public readonly string EntityId;
public readonly object Payload;
public ShardEnvelope(string entityId, object payload)
{
EntityId = entityId;
Payload = payload;
}
}
我承认这似乎有点矫枉过正,我认为如果在 Akka.Net.
中首先 class 支持虚拟演员,这将得到补救
我开始研究 Akka.Net 框架与训练营。
我可以理解基本的 Actor 概念和使用事件溯源的持久性。
我无法理解领域事件将如何被其他参与者发送和接收。
限制为单个系统本地部署的 Actors 和无 DI 容器和 使用 c# /ASP.NET API 我将每个 AgreegateRoot 分离到它自己的项目中
我正在构思类似
的东西演员经理
- -聚合根
- --ChildActor 1
- --ChildActor 2
- --ChildActor n
- -ValidationActor
- -聚合根
Manager Actor 将接收命令消息并通过验证过程,如果通过验证将发送到 AggregateRoot Actor。事件将在根或子 actor 中生成。
请指教:
要从实体内部在类似于事件总线的东西上发布事件,我可以使用以下语法吗?
Context.System.EventStream.Publish(MyEvent);
订阅事件 我理解语法是
System.EventStream.Subscribe( subscriber,MyEvent)
我希望一个 Actor 发布的事件由 Handlers(其他 Actor)处理,其中当前的 AggregateRoot Actor 或里面的子实体应该不知道。
这就是我完全卡住的地方。这是如何实现的?
System.EventStream.Subscribe 中的订阅者是 IActorRef。为此,我需要了解 class.
我应该创建一个启动引导程序,它将引用所有项目/AggregateRoots 并在那里构建对消息类型的订阅吗?
我曾尝试查找博客或文章,但运气不佳。
提前致谢。
I am separating each AgreegateRoot into its own project
我不会为每个 AggregateRoot 都创建一个单独的项目,这对我来说似乎有点过分了。你从中得到什么?您可以只使用一个 class / actor,不需要一个完全独立的项目。
听起来您可能对如何引用表示您的事件的 C# 类型感到困惑,如果每个 AggregateRoot 都有一个单独的项目,这并不奇怪 - 您很快就会 运行 进入循环参考。尝试从单个项目开始,在该项目中使用文件夹分隔有界上下文。在每个文件夹中,创建您需要的任何聚合,以及它们负责的任何事件。这样,所有参与者都可以看到所有事件类型。一旦增长/变得难以管理,您可以考虑将其拆分为单独的项目,大致如下:
- MyApp.BoundedContext1
- MyApp.BoundedContext1.Events
- MyApp.BoundedContext2
- MyApp.BoundedContext2.Events
请注意,您的参与者产生和订阅的事件代表整个系统中的一种 public 合同/API。像上面那样将它们作为单独的 DLL 可以避免循环引用(因为 .Events 项目不引用任何东西)。所以在这个结构中,MyApp.BoundedContext1
可以引用 MyApp.BoundedContext1.Events
并发布它们。 MyApp.BoundedContext2
也可以参考MyApp.BoundedContext1.Events
并订阅。
I want the event published by an Actor to be handled by Handlers (other Actors), of which current AggregateRoot Actor or Child Entity inside should have no knowledge.
This is where I am totally stuck. How is this achieved?
您的发布者不需要订阅者的知识。发布者只是向 EventStream 发布一条消息。演员和事件类型定义的上下文应该相同(即发布者应该 'own' 他们的事件类型)。例如,如果您有一个发布 ThingValidated
事件的 ValidationActor
,它们应该都在相同的上下文中。
subscriber in System.EventStream.Subscribe, is IActorRef. To get this I would need knowledge of the class.
哪个class?订户已经知道自己。您可以只使用 Self
来获得 IActorRef
。如果您指的是 事件 class,请参阅上面有关如何构建项目以引用此事件的信息。
创建 actor 后,它可以注册它感兴趣的任何事件类型 - 如果需要,您可以将此代码放入 actor 本身的初始化代码中,或者如果您有 singleton-style 个 actor ,如果您愿意,可以使用某种引导程序函数。
另一种解决方案是使用 Akka.Cluster.Sharding,这非常适合此用例。然而;我承认集群分片不是初学者的话题。集群分片允许您设置如何通过 props 创建 actor 以及将消息映射到特定 "Entity" 的策略。
这是来自博客 post 我已链接到上面的示例中的演员的集群分片设置示例:
using(var system = ActorSystem.Create("cluster-system"))
{
var sharding = ClusterSharding.Get(system);
var shardRegion = sharding.Start(
typeName: nameof(MyActor),
entityProps: Props.Create<MyActor>(), // the Props used to create entities
settings: ClusterShardingSettings.Create(system),
messageExtractor: new MessageExtractor(maxNumberOfNodes * 10)
);
// ... etc
}
然后您可以像这样发送消息:
region.Tell(new ShardEnvelope("<entity-id>", new MyMessage()));
消息提取器
上面提到的消息提取器是实现这一切的关键组件。消息提取器允许您将消息映射到特定实体(实际上是 entityId)。因此,如果您的消息都包含目标实体的 ID,那么这就变得简单了。这是一个示例消息提取器(再次来自 petabridge 博客 post 和 Akka.Net 代码库):
public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards) { }
public override string EntityId(object message) =>
(message as ShardEnvelope)?.EntityId;
public override object EntityMessage(object message) =>
(message as ShardEnvelope)?.Payload;
}
public sealed class ShardEnvelope
{
public readonly string EntityId;
public readonly object Payload;
public ShardEnvelope(string entityId, object payload)
{
EntityId = entityId;
Payload = payload;
}
}
我承认这似乎有点矫枉过正,我认为如果在 Akka.Net.
中首先 class 支持虚拟演员,这将得到补救