使用 ClusterSharding 在 Akka 集群中存储实体 ID
Storing entity IDs in an Akka cluster using ClusterSharding
我有一个 akka.net 集群,我打算在其中使用 ClusterSharding 跨集群中的节点对实体(参与者)进行分片。位于集群上方的是一个标准 REST 样式 API 层,提供用于创建和检索实体的端点。我还希望能够 return,通过这个 API,集群中当前所有实体的 ID(就像您通常在 REST 风格中 API,例如通过/api/entity 上的 GET)- 这些 ID 将是实体的 属性,或者可以是 ClusterSharding 模块在定向消息时使用的完整 ID。
有没有一种简单的方法可以从集群中检索所有这些 ID?或者我应该创建一个单独的 'caching' actor 来存储 REST API 层可以直接调用的实体创建/恢复时的 ID(在持久性 actor 的情况下)?或者 API 层是否应该在每次被调用时向集群查询 ID(通过向所有实体广播消息并聚合响应)?最后一个似乎有点浪费,尤其是在实体列表相对静态的时候。
还是我遗漏了一些东西,需要以不同的方式设计?
感谢您提供的任何帮助。
如果您只对当前存在的实体感兴趣,您可以简单地向当前分片区域询问其实体 ID:
var region = ClusterSharding.Get(system).ShardRegion(typeName);
var state = await region.Ask<CurrentShardRegionState>(GetShardRegionState.Instance);
foreach (var shard in state.Shards)
foreach(var entityId in shard.EntityIds)
Console.WriteLine($"{typeName}/{shard.ShardId}/{entityId}");
然而,这将仅检索有关生活在当前群集节点上的实体的信息,而不是所有地方。但是,如果需要,您可以汇总结果:
var cluster = Cluster.Get(system);
var region = ClusterSharding.Get(system).ShardRegion(typeName);
var members = cluster.State.Members;
var regions = members
.Select(m => region.Path.ToStringWithAddress(m.Address))
.Select(system.ActorSelection);
// at this point regions variable is a collection of ActorSelections poiting
// to regions on all known nodes. You can use Ask pattern on them directly
请记住,这种请求可能会非常庞大。
虽然这将允许您检索有关特定 typeName 的所有节点上的所有 EntityID 的信息,但它仍然不会计算非活动实体。
如果您想要获取所有节点上所有实体的 ID,则需要对分片模块使用的 ReadJournal 执行查询。这需要满足两个条件:
- 它仅在您的集群分片扩展使用支持 Akka.Persistence.Query.
的持久后端时有效
- 您需要
akka.cluster.sharding.remember-entities = on
设置 - 默认情况下它是关闭的,一旦设置它会增加集群分片基础设施的成本。
如果是 SQL 期刊,它应该大致如下所示:
var queries = PersistenceQuery.Get(actorSystem)
.ReadJournalFor<SqlReadJournal>("path-to-sharding-journal");
var shardPrefix = "/sharding/" + typeName + "Shard/"
Source<string, NotUsed> entitySource = queries
.AllPersistenceIds()
.Where(pid => pid.StartsWith(shardPrefix)) // get all shard IDs ever known
.ConcatMany(shardId => queries.CurrentEventsByPersistenceId(shardId)) // get stream of persisted events for each shard (you may need to stretch the query limit here)
.Collect(env => env.Event as Shard.EntityStarted) // filter out EntityStarted events
.Select(e => e.EntityId); // extract entity ID
using (var mat = system.Materializer())
{
// materialize source into a list
var entityIds = await entitySource
.RunAggregate(ImmutableList<string>.Emtpy, (acc, id) => acc.Add(id), mat);
}
请记住,这种查询也可能非常昂贵。
我有一个 akka.net 集群,我打算在其中使用 ClusterSharding 跨集群中的节点对实体(参与者)进行分片。位于集群上方的是一个标准 REST 样式 API 层,提供用于创建和检索实体的端点。我还希望能够 return,通过这个 API,集群中当前所有实体的 ID(就像您通常在 REST 风格中 API,例如通过/api/entity 上的 GET)- 这些 ID 将是实体的 属性,或者可以是 ClusterSharding 模块在定向消息时使用的完整 ID。
有没有一种简单的方法可以从集群中检索所有这些 ID?或者我应该创建一个单独的 'caching' actor 来存储 REST API 层可以直接调用的实体创建/恢复时的 ID(在持久性 actor 的情况下)?或者 API 层是否应该在每次被调用时向集群查询 ID(通过向所有实体广播消息并聚合响应)?最后一个似乎有点浪费,尤其是在实体列表相对静态的时候。
还是我遗漏了一些东西,需要以不同的方式设计?
感谢您提供的任何帮助。
如果您只对当前存在的实体感兴趣,您可以简单地向当前分片区域询问其实体 ID:
var region = ClusterSharding.Get(system).ShardRegion(typeName);
var state = await region.Ask<CurrentShardRegionState>(GetShardRegionState.Instance);
foreach (var shard in state.Shards)
foreach(var entityId in shard.EntityIds)
Console.WriteLine($"{typeName}/{shard.ShardId}/{entityId}");
然而,这将仅检索有关生活在当前群集节点上的实体的信息,而不是所有地方。但是,如果需要,您可以汇总结果:
var cluster = Cluster.Get(system);
var region = ClusterSharding.Get(system).ShardRegion(typeName);
var members = cluster.State.Members;
var regions = members
.Select(m => region.Path.ToStringWithAddress(m.Address))
.Select(system.ActorSelection);
// at this point regions variable is a collection of ActorSelections poiting
// to regions on all known nodes. You can use Ask pattern on them directly
请记住,这种请求可能会非常庞大。
虽然这将允许您检索有关特定 typeName 的所有节点上的所有 EntityID 的信息,但它仍然不会计算非活动实体。
如果您想要获取所有节点上所有实体的 ID,则需要对分片模块使用的 ReadJournal 执行查询。这需要满足两个条件:
- 它仅在您的集群分片扩展使用支持 Akka.Persistence.Query. 的持久后端时有效
- 您需要
akka.cluster.sharding.remember-entities = on
设置 - 默认情况下它是关闭的,一旦设置它会增加集群分片基础设施的成本。
如果是 SQL 期刊,它应该大致如下所示:
var queries = PersistenceQuery.Get(actorSystem)
.ReadJournalFor<SqlReadJournal>("path-to-sharding-journal");
var shardPrefix = "/sharding/" + typeName + "Shard/"
Source<string, NotUsed> entitySource = queries
.AllPersistenceIds()
.Where(pid => pid.StartsWith(shardPrefix)) // get all shard IDs ever known
.ConcatMany(shardId => queries.CurrentEventsByPersistenceId(shardId)) // get stream of persisted events for each shard (you may need to stretch the query limit here)
.Collect(env => env.Event as Shard.EntityStarted) // filter out EntityStarted events
.Select(e => e.EntityId); // extract entity ID
using (var mat = system.Materializer())
{
// materialize source into a list
var entityIds = await entitySource
.RunAggregate(ImmutableList<string>.Emtpy, (acc, id) => acc.Add(id), mat);
}
请记住,这种查询也可能非常昂贵。