如何在.NET 中自定义任务调度?
How to customize task scheduling in .NET?
我正在努力实现以下目标:
主线程应该创建一些任务(实际上,可能有很多线程,但让我们现在简化一些事情以避免并发和并行的额外复杂性),这些任务以某种方式被调度并稍后执行。也许在固定的时间间隔内由某些 Timer
执行,也许稍后在同一线程上为这些任务阻塞时执行,也许由专门为此任务指定的另一个线程执行 - 目前调度的实现不是 well-defined,我只是想了解基本思想。
我希望我的代码看起来像这样:
'Somewhere in the main thread...
Dim MyTask = CreateTask(Of Integer)(Function()
Console.WriteLine("Task has been called!")
'Some activity...
Return 42
End Sub)
'...
UseTheResult(MyTask.Result)
...其中 CreateTask
是响应将委托安排为稍后执行的任务的子例程。
问题是,我不知道如何实现这个概念。起初,我尝试过使用SynchronizationContext
,但它似乎更多的是私有线程内存,与调度任务无关(虽然Windows UI似乎以某种方式将它用于这个目标 - 我还没有清楚地理解所有这些上下文)。然后我偶然发现了 TaskScheduler
,它与 TaskFactory
相结合,似乎可以做到这一点。我已经尝试实现这个线程,但我的代码没有按预期工作,这意味着我遗漏了一些关于 TaskScheduler
如何工作的非常重要的东西,或者更糟的是 - 关于整个概念.我已经阅读了一些关于该主题的文章(最值得注意的是 TaskScheduler
上的 MSDN documentation 及其代码示例),但仍然不明白这是如何工作的。
我的 not-working-as-expected-code,它应该安排并执行任务,它们之间有 100 毫秒的延迟(所以整个测试应该 运行ning 约 10 秒):
Public Sub Main()
Dim T As New TaskFactory(New TestScheduler)
Dim tasks As New List(Of Task)
For I = 1 To 100
Dim J = I
tasks.Add(T.StartNew(Sub() Console.WriteLine(J)))
Next I
Task.WaitAll(tasks.ToArray)
Console.WriteLine("Test end")
Console.ReadLine()
End Sub
Class TestScheduler : Inherits TaskScheduler
Dim Tasks As New Collections.Concurrent.ConcurrentQueue(Of Task)
'Dim Counter As New Threading.AutoResetEvent(False)
Dim CTimer As New Threading.Timer(AddressOf TimerUp, Nothing, 0, 100)
Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)
Return Tasks
End Function
Private Sub TimerUp(State As Object)
Static AllTasks% = 0
Dim T As task = Nothing
If Tasks.TryDequeue(T) Then
Console.WriteLine("Task... {0}", AllTasks) 'for debugging
AllTasks += 1
If Not MyBase.TryExecuteTask(T) Then
QueueTask(T)
End If
End If
End Sub
Protected Overrides Sub QueueTask(task As Task)
Tasks.Enqueue(task)
End Sub
Protected Overrides Function TryExecuteTaskInline(task As Task, taskWasPreviouslyQueued As Boolean) As Boolean
If taskWasPreviouslyQueued Then Return False
Return MyBase.TryExecuteTask(task)
End Function
End Class
'Output:
'2
'1
'2
'... and that's it, no even "Test end" or any other output whatsoever.
'I have strong feeling that it throws an exception somewhere; however,
'when I try to debug it, it silently stops, without waiting for <Enter>,
'and when I run it without debugging - it produces the abovementioned output without
'any exceptions or messages.
而且我有一种强烈的感觉,我误解了这里的一些基本概念。
总而言之,我的问题是:如何在 .NET 中自定义任务调度?
我应该使用什么 - SynchronizationContext
、TaskScheduler
、派生 类 或其他?如果是这样,我应该覆盖哪些确切的方法,它们的含义是什么?所有这些 QueueTask
、TryExecuteTaskInline
和 TryExecuteTask
简直让我抓狂...
P.S.: 请原谅,如果您愿意,请更正我在那里犯的任何语法或逻辑错误。英语不是我的母语,我现在想得不是很清楚 [undersleeping]。谢谢!
编辑: 非常抱歉误导大家。代码运行 完美 ,我刚刚犯了一个非常粗鲁的错误,忘记将我的项目标记为 "Start-Up" - 因此不小心 运行 进行了另一次测试项目在同一个解决方案中......起初我打算关闭这个问题,但后来我意识到最好回答它 - 它应该可以帮助任何遇到同样问题的人,而且最重要的是,允许更好解决方案:到目前为止,许多问题都没有得到解答。为什么我们需要 TaskScheduler.TryExecuteTask
- 为什么 "Try"?这个方法什么时候可以returnFalse
? TryExecuteTaskInline
方法什么时候执行 运行,什么时候可以用 taskWasPreviouslyQueued = true
执行?我有一些假设,但同样,它们可能是错误的。
我是否需要将这些主题作为单独的问题提出,或者,因为它们仍然属于这个问题的标题,所以它们可以存在吗?
因此,为了实现这一目标,可以结合使用 TaskScheduler
和 TaskFactory
,这对我来说非常有效:
Public Sub Main()
Dim T As New TaskFactory(New TestScheduler)
Dim tasks As New List(Of Task)
For I = 1 To 100
Dim J = I
tasks.Add(T.StartNew(Sub() Console.WriteLine(J)))
Next I
Task.WaitAll(tasks.ToArray)
Console.WriteLine("Test end")
Console.ReadLine()
End Sub
Public Function CreateTask(ActionToSchedule As Action) As Task
'This is exactly the needed function!
'A single static instances of both TaskFactory and custom TaskSheduler are used.
Static Factory As New TaskFactory(New TestScheduler)
Return Factory.StartNew(ActionToSchedule)
End Function
Class TestScheduler : Inherits TaskScheduler
Dim Tasks As New Collections.Concurrent.ConcurrentQueue(Of Task)
Dim CTimer As New Threading.Timer(AddressOf TimerUp, Nothing, 0, 10)
Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)
Return Tasks
End Function
Private Sub TimerUp(State As Object)
Static AllTasks% = 0
Dim T As task = Nothing
If Tasks.TryDequeue(T) Then
Console.WriteLine("Task... {0}", AllTasks) 'for debugging purposes
AllTasks += 1
If Not MyBase.TryExecuteTask(T) Then
QueueTask(T)
End If
End If
End Sub
Protected Overrides Sub QueueTask(task As Task)
'Here you should schedule the given task for later execution, whatever that means.
'In this case it is simply put in the queue to be executed later when the timer fires.
Tasks.Enqueue(task)
End Sub
Protected Overrides Function TryExecuteTaskInline(task As Task, taskWasPreviouslyQueued As Boolean) As Boolean
'> Attempts to execute the specified task on the current thread.
'As far as I understand - when the thread starts to wait for the task completion, either by trying to read
'"task.Result" or executing "T.RunSynchronously()", this function will allow to execute the task synchronously
'on the waiting thread instead of blocking, which may lead to a huge performance&speed gain.
'If our task is not executing - waiting somewhere in the queue or is not even enqueued yet - we
'may dequeue it and run instantly to avoid the blocking.
Return False
'In this particular case, I've decided that the timer should run all the tasks, and, therefore, there must be done
'no inlining. Hovewer, the code below (obviously, it will not execute) should illustrate
'this idea.
If taskWasPreviouslyQueued Then
'If the task is already enqueued, then we may try to remove it from
'the queue and execute there instead.
If Tasks.TryDequeue(task) Then
Return MyBase.TryExecuteTask(task)
Else
'It has already been dequeued and maybe even completed already, so - no, no inlining. :)
Return False
End If
Else
'If it wasn't even enqueued - ha! No questions - we may run it now without any preparations.
Return MyBase.TryExecuteTask(task)
End If
End Function
End Class
我仍然不确定错误的问题的更正和完成是否应该作为答案发布,但我希望它比简单地关闭它对社区更有好处。
我正在努力实现以下目标:
主线程应该创建一些任务(实际上,可能有很多线程,但让我们现在简化一些事情以避免并发和并行的额外复杂性),这些任务以某种方式被调度并稍后执行。也许在固定的时间间隔内由某些 Timer
执行,也许稍后在同一线程上为这些任务阻塞时执行,也许由专门为此任务指定的另一个线程执行 - 目前调度的实现不是 well-defined,我只是想了解基本思想。
我希望我的代码看起来像这样:
'Somewhere in the main thread...
Dim MyTask = CreateTask(Of Integer)(Function()
Console.WriteLine("Task has been called!")
'Some activity...
Return 42
End Sub)
'...
UseTheResult(MyTask.Result)
...其中 CreateTask
是响应将委托安排为稍后执行的任务的子例程。
问题是,我不知道如何实现这个概念。起初,我尝试过使用SynchronizationContext
,但它似乎更多的是私有线程内存,与调度任务无关(虽然Windows UI似乎以某种方式将它用于这个目标 - 我还没有清楚地理解所有这些上下文)。然后我偶然发现了 TaskScheduler
,它与 TaskFactory
相结合,似乎可以做到这一点。我已经尝试实现这个线程,但我的代码没有按预期工作,这意味着我遗漏了一些关于 TaskScheduler
如何工作的非常重要的东西,或者更糟的是 - 关于整个概念.我已经阅读了一些关于该主题的文章(最值得注意的是 TaskScheduler
上的 MSDN documentation 及其代码示例),但仍然不明白这是如何工作的。
我的 not-working-as-expected-code,它应该安排并执行任务,它们之间有 100 毫秒的延迟(所以整个测试应该 运行ning 约 10 秒):
Public Sub Main()
Dim T As New TaskFactory(New TestScheduler)
Dim tasks As New List(Of Task)
For I = 1 To 100
Dim J = I
tasks.Add(T.StartNew(Sub() Console.WriteLine(J)))
Next I
Task.WaitAll(tasks.ToArray)
Console.WriteLine("Test end")
Console.ReadLine()
End Sub
Class TestScheduler : Inherits TaskScheduler
Dim Tasks As New Collections.Concurrent.ConcurrentQueue(Of Task)
'Dim Counter As New Threading.AutoResetEvent(False)
Dim CTimer As New Threading.Timer(AddressOf TimerUp, Nothing, 0, 100)
Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)
Return Tasks
End Function
Private Sub TimerUp(State As Object)
Static AllTasks% = 0
Dim T As task = Nothing
If Tasks.TryDequeue(T) Then
Console.WriteLine("Task... {0}", AllTasks) 'for debugging
AllTasks += 1
If Not MyBase.TryExecuteTask(T) Then
QueueTask(T)
End If
End If
End Sub
Protected Overrides Sub QueueTask(task As Task)
Tasks.Enqueue(task)
End Sub
Protected Overrides Function TryExecuteTaskInline(task As Task, taskWasPreviouslyQueued As Boolean) As Boolean
If taskWasPreviouslyQueued Then Return False
Return MyBase.TryExecuteTask(task)
End Function
End Class
'Output:
'2
'1
'2
'... and that's it, no even "Test end" or any other output whatsoever.
'I have strong feeling that it throws an exception somewhere; however,
'when I try to debug it, it silently stops, without waiting for <Enter>,
'and when I run it without debugging - it produces the abovementioned output without
'any exceptions or messages.
而且我有一种强烈的感觉,我误解了这里的一些基本概念。
总而言之,我的问题是:如何在 .NET 中自定义任务调度?
我应该使用什么 - SynchronizationContext
、TaskScheduler
、派生 类 或其他?如果是这样,我应该覆盖哪些确切的方法,它们的含义是什么?所有这些 QueueTask
、TryExecuteTaskInline
和 TryExecuteTask
简直让我抓狂...
P.S.: 请原谅,如果您愿意,请更正我在那里犯的任何语法或逻辑错误。英语不是我的母语,我现在想得不是很清楚 [undersleeping]。谢谢!
编辑: 非常抱歉误导大家。代码运行 完美 ,我刚刚犯了一个非常粗鲁的错误,忘记将我的项目标记为 "Start-Up" - 因此不小心 运行 进行了另一次测试项目在同一个解决方案中......起初我打算关闭这个问题,但后来我意识到最好回答它 - 它应该可以帮助任何遇到同样问题的人,而且最重要的是,允许更好解决方案:到目前为止,许多问题都没有得到解答。为什么我们需要 TaskScheduler.TryExecuteTask
- 为什么 "Try"?这个方法什么时候可以returnFalse
? TryExecuteTaskInline
方法什么时候执行 运行,什么时候可以用 taskWasPreviouslyQueued = true
执行?我有一些假设,但同样,它们可能是错误的。
我是否需要将这些主题作为单独的问题提出,或者,因为它们仍然属于这个问题的标题,所以它们可以存在吗?
因此,为了实现这一目标,可以结合使用 TaskScheduler
和 TaskFactory
,这对我来说非常有效:
Public Sub Main()
Dim T As New TaskFactory(New TestScheduler)
Dim tasks As New List(Of Task)
For I = 1 To 100
Dim J = I
tasks.Add(T.StartNew(Sub() Console.WriteLine(J)))
Next I
Task.WaitAll(tasks.ToArray)
Console.WriteLine("Test end")
Console.ReadLine()
End Sub
Public Function CreateTask(ActionToSchedule As Action) As Task
'This is exactly the needed function!
'A single static instances of both TaskFactory and custom TaskSheduler are used.
Static Factory As New TaskFactory(New TestScheduler)
Return Factory.StartNew(ActionToSchedule)
End Function
Class TestScheduler : Inherits TaskScheduler
Dim Tasks As New Collections.Concurrent.ConcurrentQueue(Of Task)
Dim CTimer As New Threading.Timer(AddressOf TimerUp, Nothing, 0, 10)
Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)
Return Tasks
End Function
Private Sub TimerUp(State As Object)
Static AllTasks% = 0
Dim T As task = Nothing
If Tasks.TryDequeue(T) Then
Console.WriteLine("Task... {0}", AllTasks) 'for debugging purposes
AllTasks += 1
If Not MyBase.TryExecuteTask(T) Then
QueueTask(T)
End If
End If
End Sub
Protected Overrides Sub QueueTask(task As Task)
'Here you should schedule the given task for later execution, whatever that means.
'In this case it is simply put in the queue to be executed later when the timer fires.
Tasks.Enqueue(task)
End Sub
Protected Overrides Function TryExecuteTaskInline(task As Task, taskWasPreviouslyQueued As Boolean) As Boolean
'> Attempts to execute the specified task on the current thread.
'As far as I understand - when the thread starts to wait for the task completion, either by trying to read
'"task.Result" or executing "T.RunSynchronously()", this function will allow to execute the task synchronously
'on the waiting thread instead of blocking, which may lead to a huge performance&speed gain.
'If our task is not executing - waiting somewhere in the queue or is not even enqueued yet - we
'may dequeue it and run instantly to avoid the blocking.
Return False
'In this particular case, I've decided that the timer should run all the tasks, and, therefore, there must be done
'no inlining. Hovewer, the code below (obviously, it will not execute) should illustrate
'this idea.
If taskWasPreviouslyQueued Then
'If the task is already enqueued, then we may try to remove it from
'the queue and execute there instead.
If Tasks.TryDequeue(task) Then
Return MyBase.TryExecuteTask(task)
Else
'It has already been dequeued and maybe even completed already, so - no, no inlining. :)
Return False
End If
Else
'If it wasn't even enqueued - ha! No questions - we may run it now without any preparations.
Return MyBase.TryExecuteTask(task)
End If
End Function
End Class
我仍然不确定错误的问题的更正和完成是否应该作为答案发布,但我希望它比简单地关闭它对社区更有好处。