正确取消长时间 "string" 的任务延续?强制任务执行顺序和线程使用?
Cancelling a long "string" of Task continuations properly? Forcing Task execution order and thread usage?
我有一个 class 将小块 IO 工作串在一起作为 Task
延续。每次收到一件作品时,都会创建一个新的 Task
,然后将其添加为 LastCreatedTask
的延续。我正在尝试确定正确取消所有这些 Task
的正确方法?
这是我目前的设置
private Task LastCreatedTask { get; set; }
private CancellationTokenSource TaskQueueTokenSource { get; set; }
public void ScheduleChunk(IOWorkChunk inChunk, int procTimeout)
{
Task ioChunkProcessTask = CreateProcessTask(inChunk, procTimeout);
LastCreatedTask.ContinueWith(
(t) => ioChunkProcessTask.Start(),
TaskContinuationOptions.ExecuteSynchronously);
LastCreatedTask = ioChunkProcessTask;
}
private Task CreateProcessTask(IOWorkChunk inChunk, int procTimeout)
{
// Create a TokenSource that will cancel after a given timeout period
var ProcessTimeoutTokenSource = new CancellationTokenSource(
TimeSpan.FromSeconds(procTimeout));
// Create a TokenSource for the Task that is a
// link between the timeout and "main" token source
var ProcessTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
TaskQueueTokenSource.Token,
ProcessTimeoutTokenSource.Token);
// Create a Task that will do the actual processing
Task ioChunkProcessTask = new Task(() =>
{
if(!ProcessTokenSource.Token.IsCancellationRequested)
inChunk.DoProcessing(ProcessTokenSource.Token);
}, ProcessTokenSource.Token);
return ioChunkProcessTask;
}
所以在函数 ScheduleChunk
中,一个 "chunk" 的 IO 工作(和一个超时)被传递给 CreateProcessTask
,它创建一个 Task
来进行实际处理的 IO 工作。 CancellationToken
被传递给这个 Task
,这是通过将两个 CancellationTokenSources
链接在一起而形成的。
第一个是"main"CancellationTokenSource
;我希望能够简单地在此源上调用 Cancel
来取消链接的所有 Task
。第二个源是在给定的一段时间后自动取消的源(这会停止长 running/stalled IO 块)。
最后,一旦构造的 Task
返回到 ScheduleChunk
,它就会作为延续添加到 LastCreatedTask
,这是作为延续添加的最后一个任务。这实际上使 Task
和 运行 的链条一个接一个地排列。
1. 我上面的方法是取消Task
链的正确方法吗?通过在 TaskQueueTokenSource
?
上调用 Cancel
2. 使用 TaskContinuationOptions.ExecuteSynchronously
和延续是否是确保这些 Task
执行的正确方法一个接一个地排序?
3. 有没有办法指定我希望来自底层 TPL ThreadPool
的同一个线程在这条链上工作Task
?
据我所知,不应为每个延续点创建一个新线程,尽管在某些时候新线程可能会拾取链。
使用传递给每个任务的共享取消令牌,而不是为每个任务创建唯一的取消令牌。当您取消该令牌时,所有使用该令牌的任务将知道停止处理。
你在我回答后进行了编辑,所以我会回复你的个人编号问题:
1. 我上面的方法是取消任务链的正确方法吗?通过在 TaskQueueTokenSource?
上调用 Cancel
According to msdn,最佳做法是创建一个令牌,然后将相同的令牌传递给每个任务。
2. 使用 TaskContinuationOptions.ExecuteSynchronously 和延续是否是确保这些任务在任务之后按顺序执行的正确方法其他?
不,你的任务应该是 运行 并行的,最好的做法是让依赖于顺序的任务沿着链条相互调用,第一个调用第二个,依此类推在。或者,您可以等到第一个任务完成后再开始第二个任务。
3. 有没有办法指定我希望来自底层 TPL 线程池的同一个线程在这个任务链上工作?
您不太可能这样做,线程池和任务异步编程 (TAP) 和 TPL 的部分目的是抽象显式线程。您无法保证 运行 任务 运行 所在的线程,甚至不需要大量工作就可以为该任务生成新线程。
也就是说,如果出于某种原因您确实需要这样做 a custom task scheduler is the answer
现有代码根本不起作用:
LastCreatedTask.ContinueWith((t) => ioChunkProcessTask)
此延续只是 returns 一个任务,其结果将几乎立即设置为常量对象。仅此而已。
真的,这段代码的结构很笨拙。这样好多了:
async Task RunProcessing() {
while (...) {
await CreateProcessTask(...);
}
}
await
用于排序异步操作。大多数时候 await
可以代替 ContinueWith
。
取消看起来不错。也许它可以简化一点,但它工作正常。
关于 3,你永远不需要这个。线程亲和性是一种罕见的东西,应该避免。没有办法完全按照要求执行此操作。请详细说明您想要实现的目标。
如果你坚持使用Task.Run,这里有一个草图:
Task CreateProcessTask(IOWorkChunk inChunk, int procTimeout)
{
// Create a TokenSource that will cancel after a given timeout period
var ProcessTimeoutTokenSource = new CancellationTokenSource(
TimeSpan.FromSeconds(procTimeout));
// Create a TokenSource for the Task that is a
// link between the timeout and "main" token source
var ProcessTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
TaskQueueTokenSource.Token,
ProcessTimeoutTokenSource.Token);
return Task.Run(() => {
inChunk.DoProcessing(ProcessTokenSource.Token);
}, ProcessTokenSource.Token);
}
我有一个 class 将小块 IO 工作串在一起作为 Task
延续。每次收到一件作品时,都会创建一个新的 Task
,然后将其添加为 LastCreatedTask
的延续。我正在尝试确定正确取消所有这些 Task
的正确方法?
这是我目前的设置
private Task LastCreatedTask { get; set; }
private CancellationTokenSource TaskQueueTokenSource { get; set; }
public void ScheduleChunk(IOWorkChunk inChunk, int procTimeout)
{
Task ioChunkProcessTask = CreateProcessTask(inChunk, procTimeout);
LastCreatedTask.ContinueWith(
(t) => ioChunkProcessTask.Start(),
TaskContinuationOptions.ExecuteSynchronously);
LastCreatedTask = ioChunkProcessTask;
}
private Task CreateProcessTask(IOWorkChunk inChunk, int procTimeout)
{
// Create a TokenSource that will cancel after a given timeout period
var ProcessTimeoutTokenSource = new CancellationTokenSource(
TimeSpan.FromSeconds(procTimeout));
// Create a TokenSource for the Task that is a
// link between the timeout and "main" token source
var ProcessTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
TaskQueueTokenSource.Token,
ProcessTimeoutTokenSource.Token);
// Create a Task that will do the actual processing
Task ioChunkProcessTask = new Task(() =>
{
if(!ProcessTokenSource.Token.IsCancellationRequested)
inChunk.DoProcessing(ProcessTokenSource.Token);
}, ProcessTokenSource.Token);
return ioChunkProcessTask;
}
所以在函数 ScheduleChunk
中,一个 "chunk" 的 IO 工作(和一个超时)被传递给 CreateProcessTask
,它创建一个 Task
来进行实际处理的 IO 工作。 CancellationToken
被传递给这个 Task
,这是通过将两个 CancellationTokenSources
链接在一起而形成的。
第一个是"main"CancellationTokenSource
;我希望能够简单地在此源上调用 Cancel
来取消链接的所有 Task
。第二个源是在给定的一段时间后自动取消的源(这会停止长 running/stalled IO 块)。
最后,一旦构造的 Task
返回到 ScheduleChunk
,它就会作为延续添加到 LastCreatedTask
,这是作为延续添加的最后一个任务。这实际上使 Task
和 运行 的链条一个接一个地排列。
1. 我上面的方法是取消Task
链的正确方法吗?通过在 TaskQueueTokenSource
?
Cancel
2. 使用 TaskContinuationOptions.ExecuteSynchronously
和延续是否是确保这些 Task
执行的正确方法一个接一个地排序?
3. 有没有办法指定我希望来自底层 TPL ThreadPool
的同一个线程在这条链上工作Task
?
据我所知,不应为每个延续点创建一个新线程,尽管在某些时候新线程可能会拾取链。
使用传递给每个任务的共享取消令牌,而不是为每个任务创建唯一的取消令牌。当您取消该令牌时,所有使用该令牌的任务将知道停止处理。
你在我回答后进行了编辑,所以我会回复你的个人编号问题:
1. 我上面的方法是取消任务链的正确方法吗?通过在 TaskQueueTokenSource?
上调用 CancelAccording to msdn,最佳做法是创建一个令牌,然后将相同的令牌传递给每个任务。
2. 使用 TaskContinuationOptions.ExecuteSynchronously 和延续是否是确保这些任务在任务之后按顺序执行的正确方法其他?
不,你的任务应该是 运行 并行的,最好的做法是让依赖于顺序的任务沿着链条相互调用,第一个调用第二个,依此类推在。或者,您可以等到第一个任务完成后再开始第二个任务。
3. 有没有办法指定我希望来自底层 TPL 线程池的同一个线程在这个任务链上工作?
您不太可能这样做,线程池和任务异步编程 (TAP) 和 TPL 的部分目的是抽象显式线程。您无法保证 运行 任务 运行 所在的线程,甚至不需要大量工作就可以为该任务生成新线程。
也就是说,如果出于某种原因您确实需要这样做 a custom task scheduler is the answer
现有代码根本不起作用:
LastCreatedTask.ContinueWith((t) => ioChunkProcessTask)
此延续只是 returns 一个任务,其结果将几乎立即设置为常量对象。仅此而已。
真的,这段代码的结构很笨拙。这样好多了:
async Task RunProcessing() {
while (...) {
await CreateProcessTask(...);
}
}
await
用于排序异步操作。大多数时候 await
可以代替 ContinueWith
。
取消看起来不错。也许它可以简化一点,但它工作正常。
关于 3,你永远不需要这个。线程亲和性是一种罕见的东西,应该避免。没有办法完全按照要求执行此操作。请详细说明您想要实现的目标。
如果你坚持使用Task.Run,这里有一个草图:
Task CreateProcessTask(IOWorkChunk inChunk, int procTimeout)
{
// Create a TokenSource that will cancel after a given timeout period
var ProcessTimeoutTokenSource = new CancellationTokenSource(
TimeSpan.FromSeconds(procTimeout));
// Create a TokenSource for the Task that is a
// link between the timeout and "main" token source
var ProcessTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
TaskQueueTokenSource.Token,
ProcessTimeoutTokenSource.Token);
return Task.Run(() => {
inChunk.DoProcessing(ProcessTokenSource.Token);
}, ProcessTokenSource.Token);
}