取消请求到所有任务的传播时间 (TPL)
Propagation time of cancellation request to all tasks (TPL)
使用 TPL 我们有 CancellationTokenSource
提供令牌,可用于合作取消当前任务(或其开始)。
问题:
将取消请求传播到所有挂钩的 运行 任务需要多长时间?
有没有什么地方可以让代码检查:"from now" 每个感兴趣的 Task
,会发现已请求取消?
为什么需要它?
我想要稳定的单元测试,以证明取消在我们的代码中有效。
问题详情:
我们有 "Executor" 生成任务,这些任务包含一些长的 运行 动作。执行器的主要工作是限制启动的并发操作数。所有这些任务都可以单独取消,而且这些操作将在内部遵守 CancellationToken
。
我想提供单元测试,它表明当任务正在等待 slot 开始时发生取消 given action,该任务将(最终)自行取消,并且不会开始执行 给定的操作。
所以,我的想法是用单个 插槽 准备 LimitingExecutor
。然后开始屏蔽动作,解除屏蔽后会请求取消。然后"enqueue"测试动作,执行时应该会失败。使用该设置,测试将调用 unblock,然后断言 test action 的任务将在等待时抛出 TaskCanceledException
。
[Test]
public void RequestPropagationTest()
{
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
// System-state setup action:
var cancellingTask = executor.Do(() =>
{
setupEvent.WaitOne();
cancellation.Cancel();
}, CancellationToken.None);
// Main work action:
var actionTask = executor.Do(() =>
{
throw new InvalidOperationException(
"This action should be cancelled!");
}, cancellation.Token);
// Let's wait until this `Task` starts, so it will got opportunity
// to cancel itself, and expected later exception will not come
// from just starting that action by `Task.Run` with token:
while (actionTask.Status < TaskStatus.Running)
Thread.Sleep(millisecondsTimeout: 1);
// Let's unblock slot in Executor for the 'main work action'
// by finalizing the 'system-state setup action' which will
// finally request "global" cancellation:
setupEvent.Set();
Assert.DoesNotThrowAsync(
async () => await cancellingTask);
Assert.ThrowsAsync<TaskCanceledException>(
async () => await actionTask);
}
}
public class LimitingExecutor : IDisposable
{
private const int UpperLimit = 1;
private readonly Semaphore _semaphore
= new Semaphore(UpperLimit, UpperLimit);
public Task Do(Action work, CancellationToken token)
=> Task.Run(() =>
{
_semaphore.WaitOne();
try
{
token.ThrowIfCancellationRequested();
work();
}
finally
{
_semaphore.Release();
}
}, token);
public void Dispose()
=> _semaphore.Dispose();
}
可以在 GitHub.
找到此问题的可执行演示(通过 NUnit)
但是,该测试的实施有时会失败(没有预料到 TaskCanceledException
),在我的机器上可能 10 次运行中有 1 次失败。这个问题的 "solution" 是在取消请求后立即插入 Thread.Sleep
。即使睡眠 3 秒,此测试有时也会失败(在运行 20 次左右后发现),而当它通过时,通常不需要长时间等待(我猜)。供参考,请参阅diff。
"Other problem",是为了确保取消来自 "waiting time" 而不是来自 Task.Run
,因为 ThreadPool
可能很忙(其他正在执行的测试),而且很冷在取消请求后推迟第二个任务的开始 - 这将呈现此测试 "falsy-green"。 "easy fix by hack" 是主动等待第二个任务开始 - 它的 Status
变成 TaskStatus.Running
。请检查这个 branch 下的版本,看看没有这个 hack 的测试有时会 "green" - 所以示例错误可以通过它。
您的测试方法假定 cancellingTask
总是在 actionTask
之前占用 LimitingExecutor
中的插槽(进入信号量)。不幸的是,这个假设是错误的,LimitingExecutor
并不能保证这一点,这只是运气问题,这两个任务中的哪一个占据了位置(实际上在我的电脑上它只发生在大约 5% 的运行中)。
要解决此问题,您需要另一个 ManualResetEvent
,这将允许主线程等待直到 cancellingTask
实际占用插槽:
using (var slotTaken = new ManualResetEvent(initialState: false))
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
// System-state setup action:
var cancellingTask = executor.Do(() =>
{
// This is called from inside the semaphore, so it's
// certain that this task occupies the only available slot.
slotTaken.Set();
setupEvent.WaitOne();
cancellation.Cancel();
}, CancellationToken.None);
// Wait until cancellingTask takes the slot
slotTaken.WaitOne();
// Now it's guaranteed that cancellingTask takes the slot, not the actionTask
// ...
}
.NET Framework 不提供 API 来检测任务转换到 Running
状态,所以如果您不喜欢轮询 State
属性 + Thread.Sleep()
在循环中,您需要修改 LimitingExecutor.Do()
以提供此信息,可能使用另一个 ManualResetEvent
,例如:
public Task Do(Action work, CancellationToken token, ManualResetEvent taskRunEvent = null)
=> Task.Run(() =>
{
// Optional notification to the caller that task is now running
taskRunEvent?.Set();
// ...
}, token);
使用 TPL 我们有 CancellationTokenSource
提供令牌,可用于合作取消当前任务(或其开始)。
问题:
将取消请求传播到所有挂钩的 运行 任务需要多长时间?
有没有什么地方可以让代码检查:"from now" 每个感兴趣的 Task
,会发现已请求取消?
为什么需要它?
我想要稳定的单元测试,以证明取消在我们的代码中有效。
问题详情:
我们有 "Executor" 生成任务,这些任务包含一些长的 运行 动作。执行器的主要工作是限制启动的并发操作数。所有这些任务都可以单独取消,而且这些操作将在内部遵守 CancellationToken
。
我想提供单元测试,它表明当任务正在等待 slot 开始时发生取消 given action,该任务将(最终)自行取消,并且不会开始执行 给定的操作。
所以,我的想法是用单个 插槽 准备 LimitingExecutor
。然后开始屏蔽动作,解除屏蔽后会请求取消。然后"enqueue"测试动作,执行时应该会失败。使用该设置,测试将调用 unblock,然后断言 test action 的任务将在等待时抛出 TaskCanceledException
。
[Test]
public void RequestPropagationTest()
{
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
// System-state setup action:
var cancellingTask = executor.Do(() =>
{
setupEvent.WaitOne();
cancellation.Cancel();
}, CancellationToken.None);
// Main work action:
var actionTask = executor.Do(() =>
{
throw new InvalidOperationException(
"This action should be cancelled!");
}, cancellation.Token);
// Let's wait until this `Task` starts, so it will got opportunity
// to cancel itself, and expected later exception will not come
// from just starting that action by `Task.Run` with token:
while (actionTask.Status < TaskStatus.Running)
Thread.Sleep(millisecondsTimeout: 1);
// Let's unblock slot in Executor for the 'main work action'
// by finalizing the 'system-state setup action' which will
// finally request "global" cancellation:
setupEvent.Set();
Assert.DoesNotThrowAsync(
async () => await cancellingTask);
Assert.ThrowsAsync<TaskCanceledException>(
async () => await actionTask);
}
}
public class LimitingExecutor : IDisposable
{
private const int UpperLimit = 1;
private readonly Semaphore _semaphore
= new Semaphore(UpperLimit, UpperLimit);
public Task Do(Action work, CancellationToken token)
=> Task.Run(() =>
{
_semaphore.WaitOne();
try
{
token.ThrowIfCancellationRequested();
work();
}
finally
{
_semaphore.Release();
}
}, token);
public void Dispose()
=> _semaphore.Dispose();
}
可以在 GitHub.
找到此问题的可执行演示(通过 NUnit)但是,该测试的实施有时会失败(没有预料到 TaskCanceledException
),在我的机器上可能 10 次运行中有 1 次失败。这个问题的 "solution" 是在取消请求后立即插入 Thread.Sleep
。即使睡眠 3 秒,此测试有时也会失败(在运行 20 次左右后发现),而当它通过时,通常不需要长时间等待(我猜)。供参考,请参阅diff。
"Other problem",是为了确保取消来自 "waiting time" 而不是来自 Task.Run
,因为 ThreadPool
可能很忙(其他正在执行的测试),而且很冷在取消请求后推迟第二个任务的开始 - 这将呈现此测试 "falsy-green"。 "easy fix by hack" 是主动等待第二个任务开始 - 它的 Status
变成 TaskStatus.Running
。请检查这个 branch 下的版本,看看没有这个 hack 的测试有时会 "green" - 所以示例错误可以通过它。
您的测试方法假定 cancellingTask
总是在 actionTask
之前占用 LimitingExecutor
中的插槽(进入信号量)。不幸的是,这个假设是错误的,LimitingExecutor
并不能保证这一点,这只是运气问题,这两个任务中的哪一个占据了位置(实际上在我的电脑上它只发生在大约 5% 的运行中)。
要解决此问题,您需要另一个 ManualResetEvent
,这将允许主线程等待直到 cancellingTask
实际占用插槽:
using (var slotTaken = new ManualResetEvent(initialState: false))
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
// System-state setup action:
var cancellingTask = executor.Do(() =>
{
// This is called from inside the semaphore, so it's
// certain that this task occupies the only available slot.
slotTaken.Set();
setupEvent.WaitOne();
cancellation.Cancel();
}, CancellationToken.None);
// Wait until cancellingTask takes the slot
slotTaken.WaitOne();
// Now it's guaranteed that cancellingTask takes the slot, not the actionTask
// ...
}
.NET Framework 不提供 API 来检测任务转换到 Running
状态,所以如果您不喜欢轮询 State
属性 + Thread.Sleep()
在循环中,您需要修改 LimitingExecutor.Do()
以提供此信息,可能使用另一个 ManualResetEvent
,例如:
public Task Do(Action work, CancellationToken token, ManualResetEvent taskRunEvent = null)
=> Task.Run(() =>
{
// Optional notification to the caller that task is now running
taskRunEvent?.Set();
// ...
}, token);