在 MVVM 应用程序中将异步操作排队 运行 一次一个

Queue async operations to have them running one at a time in a MVVM application

我目前正在构建一个 MVVM 应用程序,我的一个视图模型使用了一个通过依赖注入注册的服务。此服务 运行 针对各种第 3 方应用程序的 powershell cmdlet 或 http REST 命令,这些应用程序在同时收到多个请求时不太高兴。

这就是为什么我希望能够从 UI 触发多个操作(不阻止它)但要确保该服务一次只处理一个。我的 UI 元素将同时显示它们是在工作还是在等待。

我尝试实施 TPL ActionBlock 但到目前为止我所有的操作 运行 同时我发现让它们在队列中工作的唯一方法会阻塞 UI 直到所有任务完成.

这是我所做的:

我的视图模型包含一个 ObservableCollection 元素,其中包含两个列表(一个嵌套在另一个列表中)在 UI 上,它看起来像一个可以展开以显示小树视图的项目列表。

我想要的是,每次我展开一个项目时,树视图中的所有子项目都会通过该服务检查它们在第 3 方应用程序中的状态。 UI 子项中的方法如下所示:

private async Task<bool> UpdateSubItemsStatus()
    {
        foreach (var item in connectorsMenuItems)
        {
            await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
        }
        return true;
    }

这里,"parent" 是第一级项目,"parent.Library" 是托管所有内容的主视图模型。

在视图模型上,检索 this 的方法如下:

public async Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
    {
        logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
        //Set requestor UI item in working state in the UI
        if(ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
        }

        ActionBlock<OperationType> actionBlock = new ActionBlock<OperationType>(async _operationType =>
        {
            logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
            await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
        });

        actionBlock.Post(operationType);
        actionBlock.Complete();
        actionBlock.Completion.Wait();
    }

然后这里名为“connectorService”的服务完成它的工作。

在最后一行中,如果我依次使用 actionBlock.Completion.Wait() 所有任务 运行,我的 UI 将被阻止。

如果我改用 await actionBlock.Completion()。 UI 未被阻塞,但所有 运行 并行。

所以,如果有人能提供建议,那就太好了!

更新:

我调整了 JSteward 的答案以满足我的需要:

我按照您的建议将 ActionBlock 声明为我的视图模型的私有成员。但是当我按照你说的那样做时,当我扩展一个项目时,它是正确排队的操作,但是如果我扩展另一个项目,那么它的操作(也在他们的队列中)与第一个操作并行 运行ning物品。这不是我希望一次只有一个操作的行为,无论有多少项请求。

所以我做了以下更改: ActionBlock 在视图模型的构造函数中一次性初始化:

public ViewModelCtor()
{
 actionBlock = new ActionBlock<ConnectorOperationArgWrapper>(async _connectorOperationArgWrapper =>
            {
                logManager.WriteLog($"Library : Sending the operation to connector service for {_connectorOperationArgWrapper.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_connectorOperationArgWrapper.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
                LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).StatusString = "Cheking Presence";
                LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).Status = LibraryItemState.UpdatingStatus;
                await connectorService.EnqueueConnectorOperations(_connectorOperationArgWrapper.operationType, _connectorOperationArgWrapper.ci, _connectorOperationArgWrapper.itemPresence.itemId, Settings.Default.ProjectLocalPath + @"\" + _connectorOperationArgWrapper.ci.ID.ToString(), _connectorOperationArgWrapper.itemPresence.ConnectorId, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, _connectorOperationArgWrapper.itemPresence).ConfigureAwait(false);
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
            });
}

因此,被展开的项目调用的方法现在如下所示:

public async Task EnqueueConnectorOperations(ConnectorOperationArgWrapper _args)
    {

        logManager.WriteLog($"Library : Received operation request for {_args.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_args.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);

        if (_args.ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).Status = LibraryItemState.NeedsAttention;
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).StatusString = "Waiting";
        }

        logManager.WriteLog($"Library : post actionblock", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
        await actionBlock.SendAsync(_args);

        //actionBlock.Complete();
        //await actionBlock.Completion;
    }

我评论了 actionBlock complete 和 completion 的部分,因为我希望块能够随时接收和排队请求,甚至每个项目可能多次。

到目前为止它似乎有效,像我那样做是正确的还是我会遇到一些麻烦?

现在您正在为每个操作创建一个新的 ActionBlockActionBlock 有一个内部队列,您应该向其发送消息并让它 运行 使用单个 ActionBlock 按顺序发送消息。通过重新排列并使 ActionBlock 成为 class 成员,您将能够更好地控制它并等待每组子视图项目。

private ActionBlock<OperationType> actionBlock;

public void OnTreeViewExpand()
{
    //Re-initialize the actionblock for a new set of operations
    actionBlock = new ActionBlock<OperationType>(async _operationType =>
    {
        logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
        await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
    });
}

private async Task<bool> UpdateSubItemsStatus()
{
    foreach (var item in connectorsMenuItems)
    {
        await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
    }

    //All items sent, signal completion
    actionBlock.Complete();
    await actionBlock.Completion;
    return true;
}

public Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
{
    logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
    //Set requestor UI item in working state in the UI
    if (ci.CIType == EnumCIType.Application)
    {
        LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
        LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
        LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
    }
    return actionBlock.SendAsync(operationType);
}

使用BlockingCollection with TaskCompletionSource

    using System;
    using System.Collections;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading.Tasks;

    namespace test
    {
        class Program
        {
            class WorkItem
            {
                public int Id { get; set; } // you can make it store more things
                public TaskCompletionSource<DateTime> TaskSource { get; } = new TaskCompletionSource<DateTime>();
            }

            class Worker : IDisposable
            {
                private BlockingCollection<WorkItem> _queue;
                private Task _consumer;

                public Worker()
                {
                    _queue = new BlockingCollection<WorkItem>();

                    _consumer = Task.Run(async () =>
                    {
                        foreach (var item in _queue.GetConsumingEnumerable())
                        {
                            await Task.Delay(1000); // some hard work
                            item.TaskSource.TrySetResult(DateTime.Now); // try is safer
// you can return whatever you want
                        }
                    });
                }

                public Task<DateTime> DoWork(int i) // return whatever you want
                {
                    var workItem = new WorkItem { Id = i };
                    _queue.Add(workItem);

                    return workItem.TaskSource.Task;
                }
                public void Dispose()
                {
                    _queue.CompleteAdding();
                }
            }

            public static void Main(string[] args)
            {
                using (var worker = new Worker())
                {
                    Task.Run(async () =>
                    {
                        var tasks = Enumerable.Range(0,10).Select(x => worker.DoWork(x)).ToArray();

                        var time = await tasks[1];
                        Console.WriteLine("2nd task finished at " + time);

                        foreach (var task in tasks)
                        {
                            time = await task;
                            Console.WriteLine("Task finished at " + time);
                        }

                        Console.ReadLine();
                    }).Wait();
                }



            }
        }
    }
    // output
    2nd task finished at 2019-01-22 19:14:57
    Task finished at 2019-01-22 19:14:56
    Task finished at 2019-01-22 19:14:57
    Task finished at 2019-01-22 19:14:58
    Task finished at 2019-01-22 19:14:59
    Task finished at 2019-01-22 19:15:00
    Task finished at 2019-01-22 19:15:01
    Task finished at 2019-01-22 19:15:02
    Task finished at 2019-01-22 19:15:03
    Task finished at 2019-01-22 19:15:04
    Task finished at 2019-01-22 19:15:05

这使您可以轻松等待命令中的单个项目,例如不会阻塞 UI 线程。