如何在不等待其他方法完成的情况下从服务结构参与者获取状态?

How to get state from service fabric actor without waiting for other methods to complete?

我有一项服务 运行 正在迭代 X 个演员,使用 ActorProxy 询问他们的状态。

对我来说很重要的一点是,此服务不会因等待来自 ect 提醒回调的 actor 中的其他一些长 运行 方法而被阻塞。

有没有什么方法可以调用下面的简单示例 GetState() 以允许该方法以正确的方式完成而不会阻塞某些提醒 运行。

class Actor : IMyActor{

public Task<MyState> GetState() => StateManager.GetAsync("key")
}

另类。

调用服务的正确方法是什么,如果它在 5 秒内没有回复,就包含。

var proxy = ActorProxy.Create<IMyActor();
var state = await proxy.GetState(); // this will wait until the actor is ready to send back the state. 

没有办法做到这一点。演员是single-threaded。如果他们正在做长时间的 运行 工作,他们在任何 actor 方法中等待完成,那么任何其他方法,包括来自外部的方法都必须等待。

即使对于当前正在执行阻塞方法的 Actor,也可以读取 actor 状态。 Actor 使用 IActorStateManager which in turn uses an IActorStateProvider. The IActorStateProvider is instantiated once per ActorService. Each partition instantiates the ActorService that is responsible for hosting and running actors. The actor service is at the core a StatefulService (or rather StatefulServiceBase which is the base class that regular stateful service uses). With this in mind, we can work with the ActorService that caters to our Actors the same way we would work with a regular service, i.e. with a service interface based on IService.

存储他们的状态

IActorStateProvider (Implemented by KvsActorStateProvider 如果你使用的是 Persisted 状态)有两个方法我们可以使用:

Task<T> LoadStateAsync<T>(ActorId actorId, string stateName, CancellationToken cancellationToken = null);
Task<PagedResult<ActorId>> GetActorsAsync(int numItemsToReturn, ContinuationToken continuationToken, CancellationToken cancellationToken);

对这些方法的调用不受参与者锁的影响,这是有道理的,因为它们旨在支持分区上的所有参与者。

示例:

创建自定义 ActorService 并使用它来托管您的演员:

public interface IManyfoldActorService : IService
{
    Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken);
}

public class ManyfoldActorService : ActorService, IManyfoldActorService
{
    ...
}

Program.Main中注册新的ActorService:

ActorRuntime.RegisterActorAsync<ManyfoldActor>(
    (context, actorType) => new ManyfoldActorService(context, actorType)).GetAwaiter().GetResult();

假设我们有一个使用以下方法的简单 Actor:

    Task IManyfoldActor.SetCountAsync(int count, CancellationToken cancellationToken)
    {
        Task.Delay(TimeSpan.FromSeconds(30), cancellationToken).GetAwaiter().GetResult();
        var task = this.StateManager.SetStateAsync("count", count, cancellationToken);
        ActorEventSource.Current.ActorMessage(this, $"Finished set {count} on {this.Id.GetLongId()}");
        return task;
    }

它等待 30 秒(模拟长 运行、阻塞、方法调用)然后将状态值 "count" 设置为 int.

在单独的服务中,我们现在可以为 Actors 调用 SetCountAsync 来生成一些状态数据:

    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
        var actorProxyFactory = new ActorProxyFactory();
        long iterations = 0;
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();
            iterations += 1;
            var actorId = iterations % 10;
            var count = Environment.TickCount % 100;
            var manyfoldActor = actorProxyFactory.CreateActorProxy<IManyfoldActor>(new ActorId(actorId));
            manyfoldActor.SetCountAsync(count, cancellationToken).ConfigureAwait(false);
            ServiceEventSource.Current.ServiceMessage(this.Context, $"Set count {count} on {actorId} @ {iterations}");
            await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
        }
    }

这个方法只是无限循环地改变角色的值。 (注意总共 10 个 actor、延迟 3 秒和 actor 延迟 30 秒之间的相关性。简单地设计这种方式是为了防止 Actor 调用等待锁定的无限累积)。每个调用也作为 fire-and-forget 执行,因此我们可以继续更新那个 returns 之前的下一个 actor 的状态。这是一段愚蠢的代码,它只是这样设计来证明理论。

现在在演员服务中,我们可以像这样实现方法GetCountsAsync

    public async Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken)
    {
        ContinuationToken continuationToken = null;
        var actors = new Dictionary<long, int>();

        do
        {
            var page = await this.StateProvider.GetActorsAsync(100, continuationToken, cancellationToken);

            foreach (var actor in page.Items)
            {
                var count = await this.StateProvider.LoadStateAsync<int>(actor, "count", cancellationToken);
                actors.Add(actor.GetLongId(), count);
            }

            continuationToken = page.ContinuationToken;
        }
        while (continuationToken != null);

        return actors;
    }

这使用底层 ActorStateProvider 查询所有已知的 Actor(对于那个分区),然后直接读取每个这样的状态 'bypassing' Actor 并且不会被 actor 的方法阻塞执行。

最后一部分,一些可以调用我们的 ActorService 并跨所有分区调用 GetCountsAsync 的方法:

    public IDictionary<long, int> Get()
    {
        var applicationName = FabricRuntime.GetActivationContext().ApplicationName;
        var actorServiceName = $"{typeof(IManyfoldActorService).Name.Substring(1)}";
        var actorServiceUri = new Uri($"{applicationName}/{actorServiceName}");

        var fabricClient = new FabricClient();
        var partitions = new List<long>();
        var servicePartitionList = fabricClient.QueryManager.GetPartitionListAsync(actorServiceUri).GetAwaiter().GetResult();
        foreach (var servicePartition in servicePartitionList)
        {
            var partitionInformation = servicePartition.PartitionInformation as Int64RangePartitionInformation;
            partitions.Add(partitionInformation.LowKey);
        }

        var serviceProxyFactory = new ServiceProxyFactory();

        var actors = new Dictionary<long, int>();
        foreach (var partition in partitions)
        {
            var actorService = serviceProxyFactory.CreateServiceProxy<IManyfoldActorService>(actorServiceUri, new ServicePartitionKey(partition));

            var counts = actorService.GetCountsAsync(CancellationToken.None).GetAwaiter().GetResult();
            foreach (var count in counts)
            {
                actors.Add(count.Key, count.Value);
            }
        }
        return actors;
    }

运行 这段代码现在将为我们提供 10 个演员,每 33:d 秒更新一次状态,每个演员每次忙 30 秒。 Actor 服务在每个 actor 方法 returns.

后立即看到更新的状态

此示例中省略了一些内容,例如,当您在 actor 服务中加载状态时,我们可能应该防止超时。

感谢大家的帮助。能够以您的示例为例,并通过一些调整使其正常工作。我遇到的唯一问题是在将数据传回原始应用程序服务时我得到了未知类型。得到

"ArrayOfKeyValueOflonglong is not expected. Add any types not known statically to the list of known types - for example, by using the KnownTypeAttribute attribute or by adding them to the list of known types passed to DataContractSerializer"

所以我将我的 Return 类型的 GetCountsAsync 更改为 List 并且在我的 class 中我使用了 DataContract 和 DataMember class 属性并且它工作正常。似乎从分区中的许多参与者检索状态数据的能力应该是参与者服务的核心部分,您不必创建自定义参与者服务来获取 StateProvider 信息。再次感谢您!