使用 ClusterSharding 在 Akka 集群中存储实体 ID

Storing entity IDs in an Akka cluster using ClusterSharding

我有一个 akka.net 集群,我打算在其中使用 ClusterSharding 跨集群中的节点对实体(参与者)进行分片。位于集群上方的是一个标准 REST 样式 API 层,提供用于创建和检索实体的端点。我还希望能够 return,通过这个 API,集群中当前所有实体的 ID(就像您通常在 REST 风格中 API,例如通过/api/entity 上的 GET)- 这些 ID 将是实体的 属性,或者可以是 ClusterSharding 模块在定向消息时使用的完整 ID。

有没有一种简单的方法可以从集群中检索所有这些 ID?或者我应该创建一个单独的 'caching' actor 来存储 REST API 层可以直接调用的实体创建/恢复时的 ID(在持久性 actor 的情况下)?或者 API 层是否应该在每次被调用时向集群查询 ID(通过向所有实体广播消息并聚合响应)?最后一个似乎有点浪费,尤其是在实体列表相对静态的时候。

还是我遗漏了一些东西,需要以不同的方式设计?

感谢您提供的任何帮助。

如果您只对当前存在的实体感兴趣,您可以简单地向当前分片区域询问其实体 ID:

var region = ClusterSharding.Get(system).ShardRegion(typeName);
var state = await region.Ask<CurrentShardRegionState>(GetShardRegionState.Instance);
foreach (var shard in state.Shards) 
    foreach(var entityId in shard.EntityIds)
        Console.WriteLine($"{typeName}/{shard.ShardId}/{entityId}");

然而,这将仅检索有关生活在当前群集节点上的实体的信息,而不是所有地方。但是,如果需要,您可以汇总结果:

var cluster = Cluster.Get(system);
var region = ClusterSharding.Get(system).ShardRegion(typeName);
var members = cluster.State.Members;
var regions = members
    .Select(m => region.Path.ToStringWithAddress(m.Address))
    .Select(system.ActorSelection);

// at this point regions variable is a collection of ActorSelections poiting
// to regions on all known nodes. You can use Ask pattern on them directly

请记住,这种请求可能会非常庞大​​。

虽然这将允许您检索有关特定 typeName 的所有节点上的所有 EntityID 的信息,但它仍然不会计算非活动实体。

如果您想要获取所有节点上所有实体的 ID,则需要对分片模块使用的 ReadJournal 执行查询。这需要满足两个条件:

  1. 它仅在您的集群分片扩展使用支持 Akka.Persistence.Query.
  2. 的持久后端时有效
  3. 您需要 akka.cluster.sharding.remember-entities = on 设置 - 默认情况下它是关闭的,一旦设置它会增加集群分片基础设施的成本。

如果是 SQL 期刊,它应该大致如下所示:

var queries = PersistenceQuery.Get(actorSystem)
    .ReadJournalFor<SqlReadJournal>("path-to-sharding-journal");

var shardPrefix = "/sharding/" + typeName + "Shard/"
Source<string, NotUsed> entitySource = queries
    .AllPersistenceIds() 
    .Where(pid => pid.StartsWith(shardPrefix)) // get all shard IDs ever known
    .ConcatMany(shardId => queries.CurrentEventsByPersistenceId(shardId)) // get stream of persisted events for each shard (you may need to stretch the query limit here)
    .Collect(env => env.Event as Shard.EntityStarted) // filter out EntityStarted events
    .Select(e => e.EntityId); // extract entity ID

using (var mat = system.Materializer())
{
    // materialize source into a list
    var entityIds = await entitySource
        .RunAggregate(ImmutableList<string>.Emtpy, (acc, id) => acc.Add(id), mat);
}

请记住,这种查询也可能非常昂贵。