在 MVVM 应用程序中将异步操作排队 运行 一次一个
Queue async operations to have them running one at a time in a MVVM application
我目前正在构建一个 MVVM 应用程序,我的一个视图模型使用了一个通过依赖注入注册的服务。此服务 运行 针对各种第 3 方应用程序的 powershell cmdlet 或 http REST 命令,这些应用程序在同时收到多个请求时不太高兴。
这就是为什么我希望能够从 UI 触发多个操作(不阻止它)但要确保该服务一次只处理一个。我的 UI 元素将同时显示它们是在工作还是在等待。
我尝试实施 TPL ActionBlock 但到目前为止我所有的操作 运行 同时我发现让它们在队列中工作的唯一方法会阻塞 UI 直到所有任务完成.
这是我所做的:
我的视图模型包含一个 ObservableCollection 元素,其中包含两个列表(一个嵌套在另一个列表中)在 UI 上,它看起来像一个可以展开以显示小树视图的项目列表。
我想要的是,每次我展开一个项目时,树视图中的所有子项目都会通过该服务检查它们在第 3 方应用程序中的状态。
UI 子项中的方法如下所示:
private async Task<bool> UpdateSubItemsStatus()
{
foreach (var item in connectorsMenuItems)
{
await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
}
return true;
}
这里,"parent" 是第一级项目,"parent.Library" 是托管所有内容的主视图模型。
在视图模型上,检索 this 的方法如下:
public async Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
{
logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
//Set requestor UI item in working state in the UI
if(ci.CIType == EnumCIType.Application)
{
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
}
ActionBlock<OperationType> actionBlock = new ActionBlock<OperationType>(async _operationType =>
{
logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
});
actionBlock.Post(operationType);
actionBlock.Complete();
actionBlock.Completion.Wait();
}
然后这里名为“connectorService”的服务完成它的工作。
在最后一行中,如果我依次使用 actionBlock.Completion.Wait() 所有任务 运行,我的 UI 将被阻止。
如果我改用 await actionBlock.Completion()。 UI 未被阻塞,但所有 运行 并行。
所以,如果有人能提供建议,那就太好了!
更新:
我调整了 JSteward 的答案以满足我的需要:
我按照您的建议将 ActionBlock 声明为我的视图模型的私有成员。但是当我按照你说的那样做时,当我扩展一个项目时,它是正确排队的操作,但是如果我扩展另一个项目,那么它的操作(也在他们的队列中)与第一个操作并行 运行ning物品。这不是我希望一次只有一个操作的行为,无论有多少项请求。
所以我做了以下更改:
ActionBlock 在视图模型的构造函数中一次性初始化:
public ViewModelCtor()
{
actionBlock = new ActionBlock<ConnectorOperationArgWrapper>(async _connectorOperationArgWrapper =>
{
logManager.WriteLog($"Library : Sending the operation to connector service for {_connectorOperationArgWrapper.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_connectorOperationArgWrapper.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).StatusString = "Cheking Presence";
LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).Status = LibraryItemState.UpdatingStatus;
await connectorService.EnqueueConnectorOperations(_connectorOperationArgWrapper.operationType, _connectorOperationArgWrapper.ci, _connectorOperationArgWrapper.itemPresence.itemId, Settings.Default.ProjectLocalPath + @"\" + _connectorOperationArgWrapper.ci.ID.ToString(), _connectorOperationArgWrapper.itemPresence.ConnectorId, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, _connectorOperationArgWrapper.itemPresence).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
});
}
因此,被展开的项目调用的方法现在如下所示:
public async Task EnqueueConnectorOperations(ConnectorOperationArgWrapper _args)
{
logManager.WriteLog($"Library : Received operation request for {_args.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_args.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
if (_args.ci.CIType == EnumCIType.Application)
{
LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).IsWorking = true;
LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).Status = LibraryItemState.NeedsAttention;
LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).StatusString = "Waiting";
}
logManager.WriteLog($"Library : post actionblock", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
await actionBlock.SendAsync(_args);
//actionBlock.Complete();
//await actionBlock.Completion;
}
我评论了 actionBlock complete 和 completion 的部分,因为我希望块能够随时接收和排队请求,甚至每个项目可能多次。
到目前为止它似乎有效,像我那样做是正确的还是我会遇到一些麻烦?
现在您正在为每个操作创建一个新的 ActionBlock
。 ActionBlock
有一个内部队列,您应该向其发送消息并让它 运行 使用单个 ActionBlock
按顺序发送消息。通过重新排列并使 ActionBlock
成为 class 成员,您将能够更好地控制它并等待每组子视图项目。
private ActionBlock<OperationType> actionBlock;
public void OnTreeViewExpand()
{
//Re-initialize the actionblock for a new set of operations
actionBlock = new ActionBlock<OperationType>(async _operationType =>
{
logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
});
}
private async Task<bool> UpdateSubItemsStatus()
{
foreach (var item in connectorsMenuItems)
{
await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
}
//All items sent, signal completion
actionBlock.Complete();
await actionBlock.Completion;
return true;
}
public Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
{
logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
//Set requestor UI item in working state in the UI
if (ci.CIType == EnumCIType.Application)
{
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
}
return actionBlock.SendAsync(operationType);
}
使用BlockingCollection with TaskCompletionSource
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace test
{
class Program
{
class WorkItem
{
public int Id { get; set; } // you can make it store more things
public TaskCompletionSource<DateTime> TaskSource { get; } = new TaskCompletionSource<DateTime>();
}
class Worker : IDisposable
{
private BlockingCollection<WorkItem> _queue;
private Task _consumer;
public Worker()
{
_queue = new BlockingCollection<WorkItem>();
_consumer = Task.Run(async () =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
await Task.Delay(1000); // some hard work
item.TaskSource.TrySetResult(DateTime.Now); // try is safer
// you can return whatever you want
}
});
}
public Task<DateTime> DoWork(int i) // return whatever you want
{
var workItem = new WorkItem { Id = i };
_queue.Add(workItem);
return workItem.TaskSource.Task;
}
public void Dispose()
{
_queue.CompleteAdding();
}
}
public static void Main(string[] args)
{
using (var worker = new Worker())
{
Task.Run(async () =>
{
var tasks = Enumerable.Range(0,10).Select(x => worker.DoWork(x)).ToArray();
var time = await tasks[1];
Console.WriteLine("2nd task finished at " + time);
foreach (var task in tasks)
{
time = await task;
Console.WriteLine("Task finished at " + time);
}
Console.ReadLine();
}).Wait();
}
}
}
}
// output
2nd task finished at 2019-01-22 19:14:57
Task finished at 2019-01-22 19:14:56
Task finished at 2019-01-22 19:14:57
Task finished at 2019-01-22 19:14:58
Task finished at 2019-01-22 19:14:59
Task finished at 2019-01-22 19:15:00
Task finished at 2019-01-22 19:15:01
Task finished at 2019-01-22 19:15:02
Task finished at 2019-01-22 19:15:03
Task finished at 2019-01-22 19:15:04
Task finished at 2019-01-22 19:15:05
这使您可以轻松等待命令中的单个项目,例如不会阻塞 UI 线程。
我目前正在构建一个 MVVM 应用程序,我的一个视图模型使用了一个通过依赖注入注册的服务。此服务 运行 针对各种第 3 方应用程序的 powershell cmdlet 或 http REST 命令,这些应用程序在同时收到多个请求时不太高兴。
这就是为什么我希望能够从 UI 触发多个操作(不阻止它)但要确保该服务一次只处理一个。我的 UI 元素将同时显示它们是在工作还是在等待。
我尝试实施 TPL ActionBlock 但到目前为止我所有的操作 运行 同时我发现让它们在队列中工作的唯一方法会阻塞 UI 直到所有任务完成.
这是我所做的:
我的视图模型包含一个 ObservableCollection 元素,其中包含两个列表(一个嵌套在另一个列表中)在 UI 上,它看起来像一个可以展开以显示小树视图的项目列表。
我想要的是,每次我展开一个项目时,树视图中的所有子项目都会通过该服务检查它们在第 3 方应用程序中的状态。 UI 子项中的方法如下所示:
private async Task<bool> UpdateSubItemsStatus()
{
foreach (var item in connectorsMenuItems)
{
await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
}
return true;
}
这里,"parent" 是第一级项目,"parent.Library" 是托管所有内容的主视图模型。
在视图模型上,检索 this 的方法如下:
public async Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
{
logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
//Set requestor UI item in working state in the UI
if(ci.CIType == EnumCIType.Application)
{
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
}
ActionBlock<OperationType> actionBlock = new ActionBlock<OperationType>(async _operationType =>
{
logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
});
actionBlock.Post(operationType);
actionBlock.Complete();
actionBlock.Completion.Wait();
}
然后这里名为“connectorService”的服务完成它的工作。
在最后一行中,如果我依次使用 actionBlock.Completion.Wait() 所有任务 运行,我的 UI 将被阻止。
如果我改用 await actionBlock.Completion()。 UI 未被阻塞,但所有 运行 并行。
所以,如果有人能提供建议,那就太好了!
更新:
我调整了 JSteward 的答案以满足我的需要:
我按照您的建议将 ActionBlock 声明为我的视图模型的私有成员。但是当我按照你说的那样做时,当我扩展一个项目时,它是正确排队的操作,但是如果我扩展另一个项目,那么它的操作(也在他们的队列中)与第一个操作并行 运行ning物品。这不是我希望一次只有一个操作的行为,无论有多少项请求。
所以我做了以下更改: ActionBlock 在视图模型的构造函数中一次性初始化:
public ViewModelCtor()
{
actionBlock = new ActionBlock<ConnectorOperationArgWrapper>(async _connectorOperationArgWrapper =>
{
logManager.WriteLog($"Library : Sending the operation to connector service for {_connectorOperationArgWrapper.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_connectorOperationArgWrapper.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).StatusString = "Cheking Presence";
LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).Status = LibraryItemState.UpdatingStatus;
await connectorService.EnqueueConnectorOperations(_connectorOperationArgWrapper.operationType, _connectorOperationArgWrapper.ci, _connectorOperationArgWrapper.itemPresence.itemId, Settings.Default.ProjectLocalPath + @"\" + _connectorOperationArgWrapper.ci.ID.ToString(), _connectorOperationArgWrapper.itemPresence.ConnectorId, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, _connectorOperationArgWrapper.itemPresence).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
});
}
因此,被展开的项目调用的方法现在如下所示:
public async Task EnqueueConnectorOperations(ConnectorOperationArgWrapper _args)
{
logManager.WriteLog($"Library : Received operation request for {_args.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_args.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
if (_args.ci.CIType == EnumCIType.Application)
{
LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).IsWorking = true;
LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).Status = LibraryItemState.NeedsAttention;
LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).StatusString = "Waiting";
}
logManager.WriteLog($"Library : post actionblock", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
await actionBlock.SendAsync(_args);
//actionBlock.Complete();
//await actionBlock.Completion;
}
我评论了 actionBlock complete 和 completion 的部分,因为我希望块能够随时接收和排队请求,甚至每个项目可能多次。
到目前为止它似乎有效,像我那样做是正确的还是我会遇到一些麻烦?
现在您正在为每个操作创建一个新的 ActionBlock
。 ActionBlock
有一个内部队列,您应该向其发送消息并让它 运行 使用单个 ActionBlock
按顺序发送消息。通过重新排列并使 ActionBlock
成为 class 成员,您将能够更好地控制它并等待每组子视图项目。
private ActionBlock<OperationType> actionBlock;
public void OnTreeViewExpand()
{
//Re-initialize the actionblock for a new set of operations
actionBlock = new ActionBlock<OperationType>(async _operationType =>
{
logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
});
}
private async Task<bool> UpdateSubItemsStatus()
{
foreach (var item in connectorsMenuItems)
{
await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
}
//All items sent, signal completion
actionBlock.Complete();
await actionBlock.Completion;
return true;
}
public Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
{
logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
//Set requestor UI item in working state in the UI
if (ci.CIType == EnumCIType.Application)
{
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
}
return actionBlock.SendAsync(operationType);
}
使用BlockingCollection with TaskCompletionSource
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace test
{
class Program
{
class WorkItem
{
public int Id { get; set; } // you can make it store more things
public TaskCompletionSource<DateTime> TaskSource { get; } = new TaskCompletionSource<DateTime>();
}
class Worker : IDisposable
{
private BlockingCollection<WorkItem> _queue;
private Task _consumer;
public Worker()
{
_queue = new BlockingCollection<WorkItem>();
_consumer = Task.Run(async () =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
await Task.Delay(1000); // some hard work
item.TaskSource.TrySetResult(DateTime.Now); // try is safer
// you can return whatever you want
}
});
}
public Task<DateTime> DoWork(int i) // return whatever you want
{
var workItem = new WorkItem { Id = i };
_queue.Add(workItem);
return workItem.TaskSource.Task;
}
public void Dispose()
{
_queue.CompleteAdding();
}
}
public static void Main(string[] args)
{
using (var worker = new Worker())
{
Task.Run(async () =>
{
var tasks = Enumerable.Range(0,10).Select(x => worker.DoWork(x)).ToArray();
var time = await tasks[1];
Console.WriteLine("2nd task finished at " + time);
foreach (var task in tasks)
{
time = await task;
Console.WriteLine("Task finished at " + time);
}
Console.ReadLine();
}).Wait();
}
}
}
}
// output
2nd task finished at 2019-01-22 19:14:57
Task finished at 2019-01-22 19:14:56
Task finished at 2019-01-22 19:14:57
Task finished at 2019-01-22 19:14:58
Task finished at 2019-01-22 19:14:59
Task finished at 2019-01-22 19:15:00
Task finished at 2019-01-22 19:15:01
Task finished at 2019-01-22 19:15:02
Task finished at 2019-01-22 19:15:03
Task finished at 2019-01-22 19:15:04
Task finished at 2019-01-22 19:15:05
这使您可以轻松等待命令中的单个项目,例如不会阻塞 UI 线程。