运行 自定义调度程序上的任务<T>
Running Task<T> on a custom scheduler
我正在创建一个通用助手 class,它将帮助确定对 API 发出的请求的优先级,同时限制它们发生的并行化。
考虑下面应用的关键方法;
public IQueuedTaskHandle<TResponse> InvokeRequest<TResponse>(Func<TClient, Task<TResponse>> invocation, QueuedClientPriority priority, CancellationToken ct) where TResponse : IServiceResponse
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
_logger.Debug("Queueing task.");
var taskToQueue = Task.Factory.StartNew(async () =>
{
_logger.Debug("Starting request {0}", Task.CurrentId);
return await invocation(_client);
}, cts.Token, TaskCreationOptions.None, _schedulers[priority]).Unwrap();
taskToQueue.ContinueWith(task => _logger.Debug("Finished task {0}", task.Id), cts.Token);
return new EcosystemQueuedTaskHandle<TResponse>(cts, priority, taskToQueue);
}
无需过多赘述,我想在轮到队列中时调用 Task<TResponse>> invocation
返回的任务。我正在使用由 QueuedTaskScheduler
构造的队列集合,由唯一枚举索引;
_queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 3);
_schedulers = new Dictionary<QueuedClientPriority, TaskScheduler>();
//Enumerate the priorities
foreach (var priority in Enum.GetValues(typeof(QueuedClientPriority)))
{
_schedulers.Add((QueuedClientPriority)priority, _queuedTaskScheduler.ActivateNewQueue((int)priority));
}
然而,我无法在有限的并行环境中执行任务,但收效甚微,导致 100 个 API 请求在一个大批量中构建、触发和完成。我可以使用 Fiddler 会话来说明这一点;
我已经阅读了一些有趣的文章和 SO 帖子 (here, here and here),我认为它们会详细说明如何解决这个问题,但到目前为止我还没有弄明白。据我了解,lambda 的 async
性质按照设计在连续结构中工作,这将生成的任务标记为已完成,基本上 "insta-completing" 它。这意味着虽然队列工作正常,但在自定义调度程序上运行生成的 Task<T>
却成了问题所在。
This means that whilst the queues are working fine, runing a generated Task on a custom scheduler is turning out to be the problem.
正确。一种思考方式[1] 是将 async
方法拆分为多个任务 - 它在每个 await
点分解。这些 "sub-tasks" 中的每一个都在任务计划程序上 运行。因此,async
方法 将 运行 完全在任务调度程序上(假设您不使用 ConfigureAwait(false)
),但在每个 await
它将 离开 任务调度程序,然后在 await
完成后 重新进入 该任务调度程序。
所以,如果你想在更高层次上协调异步工作,你需要采取不同的方法。可以为此自己编写代码,但它可能会变得混乱。我建议您首先尝试使用 TPL 数据流库中的 ActionBlock<T>
,将您的自定义任务计划程序传递给它的 ExecutionDataflowBlockOptions
。
[1] 这是一个简化。状态机将避免创建实际的 task 对象,除非必要(在这种情况下,它们是必需的,因为它们被调度到任务调度程序)。此外,只有 await
点 等待未完成 实际上会导致 "method split".
Stephen Cleary 的回答很好地解释了为什么您不能为此目的使用 TaskScheduler
以及如何使用 ActionBlock
来限制并行度。但是,如果您想为其添加优先级,我认为您必须手动进行。您使用 Dictionary
个队列的方法是合理的,一个简单的实现(不支持取消或完成)可能看起来像这样:
class Scheduler
{
private static readonly Priority[] Priorities =
(Priority[])Enum.GetValues(typeof(Priority));
private readonly IReadOnlyDictionary<Priority, ConcurrentQueue<Func<Task>>> queues;
private readonly ActionBlock<Func<Task>> executor;
private readonly SemaphoreSlim semaphore;
public Scheduler(int degreeOfParallelism)
{
queues = Priorities.ToDictionary(
priority => priority, _ => new ConcurrentQueue<Func<Task>>());
executor = new ActionBlock<Func<Task>>(
invocation => invocation(),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfParallelism,
BoundedCapacity = degreeOfParallelism
});
semaphore = new SemaphoreSlim(0);
Task.Run(Watch);
}
private async Task Watch()
{
while (true)
{
await semaphore.WaitAsync();
// find item with highest priority and send it for execution
foreach (var priority in Priorities.Reverse())
{
Func<Task> invocation;
if (queues[priority].TryDequeue(out invocation))
{
await executor.SendAsync(invocation);
}
}
}
}
public void Invoke(Func<Task> invocation, Priority priority)
{
queues[priority].Enqueue(invocation);
semaphore.Release(1);
}
}
我正在创建一个通用助手 class,它将帮助确定对 API 发出的请求的优先级,同时限制它们发生的并行化。
考虑下面应用的关键方法;
public IQueuedTaskHandle<TResponse> InvokeRequest<TResponse>(Func<TClient, Task<TResponse>> invocation, QueuedClientPriority priority, CancellationToken ct) where TResponse : IServiceResponse
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
_logger.Debug("Queueing task.");
var taskToQueue = Task.Factory.StartNew(async () =>
{
_logger.Debug("Starting request {0}", Task.CurrentId);
return await invocation(_client);
}, cts.Token, TaskCreationOptions.None, _schedulers[priority]).Unwrap();
taskToQueue.ContinueWith(task => _logger.Debug("Finished task {0}", task.Id), cts.Token);
return new EcosystemQueuedTaskHandle<TResponse>(cts, priority, taskToQueue);
}
无需过多赘述,我想在轮到队列中时调用 Task<TResponse>> invocation
返回的任务。我正在使用由 QueuedTaskScheduler
构造的队列集合,由唯一枚举索引;
_queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 3);
_schedulers = new Dictionary<QueuedClientPriority, TaskScheduler>();
//Enumerate the priorities
foreach (var priority in Enum.GetValues(typeof(QueuedClientPriority)))
{
_schedulers.Add((QueuedClientPriority)priority, _queuedTaskScheduler.ActivateNewQueue((int)priority));
}
然而,我无法在有限的并行环境中执行任务,但收效甚微,导致 100 个 API 请求在一个大批量中构建、触发和完成。我可以使用 Fiddler 会话来说明这一点;
我已经阅读了一些有趣的文章和 SO 帖子 (here, here and here),我认为它们会详细说明如何解决这个问题,但到目前为止我还没有弄明白。据我了解,lambda 的 async
性质按照设计在连续结构中工作,这将生成的任务标记为已完成,基本上 "insta-completing" 它。这意味着虽然队列工作正常,但在自定义调度程序上运行生成的 Task<T>
却成了问题所在。
This means that whilst the queues are working fine, runing a generated Task on a custom scheduler is turning out to be the problem.
正确。一种思考方式[1] 是将 async
方法拆分为多个任务 - 它在每个 await
点分解。这些 "sub-tasks" 中的每一个都在任务计划程序上 运行。因此,async
方法 将 运行 完全在任务调度程序上(假设您不使用 ConfigureAwait(false)
),但在每个 await
它将 离开 任务调度程序,然后在 await
完成后 重新进入 该任务调度程序。
所以,如果你想在更高层次上协调异步工作,你需要采取不同的方法。可以为此自己编写代码,但它可能会变得混乱。我建议您首先尝试使用 TPL 数据流库中的 ActionBlock<T>
,将您的自定义任务计划程序传递给它的 ExecutionDataflowBlockOptions
。
[1] 这是一个简化。状态机将避免创建实际的 task 对象,除非必要(在这种情况下,它们是必需的,因为它们被调度到任务调度程序)。此外,只有 await
点 等待未完成 实际上会导致 "method split".
Stephen Cleary 的回答很好地解释了为什么您不能为此目的使用 TaskScheduler
以及如何使用 ActionBlock
来限制并行度。但是,如果您想为其添加优先级,我认为您必须手动进行。您使用 Dictionary
个队列的方法是合理的,一个简单的实现(不支持取消或完成)可能看起来像这样:
class Scheduler
{
private static readonly Priority[] Priorities =
(Priority[])Enum.GetValues(typeof(Priority));
private readonly IReadOnlyDictionary<Priority, ConcurrentQueue<Func<Task>>> queues;
private readonly ActionBlock<Func<Task>> executor;
private readonly SemaphoreSlim semaphore;
public Scheduler(int degreeOfParallelism)
{
queues = Priorities.ToDictionary(
priority => priority, _ => new ConcurrentQueue<Func<Task>>());
executor = new ActionBlock<Func<Task>>(
invocation => invocation(),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfParallelism,
BoundedCapacity = degreeOfParallelism
});
semaphore = new SemaphoreSlim(0);
Task.Run(Watch);
}
private async Task Watch()
{
while (true)
{
await semaphore.WaitAsync();
// find item with highest priority and send it for execution
foreach (var priority in Priorities.Reverse())
{
Func<Task> invocation;
if (queues[priority].TryDequeue(out invocation))
{
await executor.SendAsync(invocation);
}
}
}
}
public void Invoke(Func<Task> invocation, Priority priority)
{
queues[priority].Enqueue(invocation);
semaphore.Release(1);
}
}