如何同步延迟的计划任务以公平(按顺序)执行它们?
How to synchronize delayed scheduled tasks to execute them fairly (in order)?
我正在编写一种方法来安排一些任务,这些任务需要在创建时以正确的顺序执行,每个任务都有固定的延迟。
基本上,我需要复制这个行为(其中Mx是一个调度任务的方法调用,Tx是它对应的任务):
M1--M2--M3--M4--M5--M6...
------T1--T2--T3--T4--T5...
这里的重点是这个方法被调用得非常频繁(不是我,所以我无法控制)并且我需要能够如果使用两个相互“无效”的参数调用它,则跳过它的执行(例如,如果最后一次调用是 Foo("a"),那么我得到 Foo("b") 和 Foo( “a”),我希望能够跳过最后两个调用,因为它们没用)。这就是我使用 Queue
而不是直接从每个方法调用内部安排任务的原因。
这是我目前的情况:
// Semaphore to synchronize the queue access
private readonly SemaphoreSlim QueueSemaphore = new SemaphoreSlim(1);
// Arguments queue
private readonly Queue<String> History = new Queue<String>();
// The last argument (needed to understand the useless calls)
private String _LastArgument;
protected void Foo(String arg)
{
// Wait and add the argument to the queue
QueueSemaphore.WaitAsync().ContinueWith(ts =>
{
History.Enqueue(arg);
QueueSemaphore.Release();
// Small delay (100ms are enough in this case)
Task.Delay(100).ContinueWith(td =>
{
// Wait again before accessing the queue
QueueSemaphore.WaitAsync().ContinueWith(tf =>
{
// Edge case for the removed calls
if (History.Count == 0)
{
QueueSemaphore.Release();
return;
}
// Get the next argument and test it
String next = History.Dequeue();
if (_LastArgument != null &&
History.Count > 0 &&
History.Peek().Equals(_LastArgument))
{
// Useless calls detected, skip them
StatesHistory.Dequeue();
QueueSemaphore.Release();
return;
}
_LastArgument= next;
SomeOtherMethodWithTheActualCode(next);
QueueSemaphore.Release();
}, TaskContinuationOptions.PreferFairness);
}, TaskContinuationOptions.PreferFairness);
}, TaskContinuationOptions.PreferFairness);
}
现在,我有两个问题:
- 根据我在文档中看到的内容,
TaskContinuationOptions.PreferFairness
标志只会尝试 来保持计划任务的原始顺序,但不保证它们将按相同的顺序执行
- 我多次听说
Task.Delay
方法不可靠,不应将其用于同步目的。因此,例如,如果延迟调用实际上需要 101 毫秒,而另一个延迟调用需要 99 毫秒或 98 毫秒,那可能会把整个事情搞砸。
- 由于此方法将在UI线程上执行,我想避免同步阻塞以避免UI
出现问题
感谢您的帮助!
我不确定我是否理解你的意思,但你可能想尝试异步循环(下面是一些伪代码):
正在向队列中添加项目:
async Task AddToQueueAsync(WorkItem item)
{
await LockQueueAsync();
queue.Add(item);
UnlockQueue();
}
并在无限循环中获取项目:
async Task InfiniteLoopThatExecutesTasksOneByOne()
{
while(true)
{
WorkItem item = null;
await LockQueueAsync();
item = InspectTheQueueAndSkipSomethingIfNeeded();
UnlockQueue();
if(item!=null)
await DispatchItemToUIThread(item);
await Task.Delay(delay);
}
}
有了循环,您的商品将始终被订购。作为一个缺点,您将拥有一些无限工作的代码,因此您需要某种机制在需要时 suspend/resume 它。此外,它不涵盖您的第三个问题,我目前想不出任何方法来异步实现精确延迟。
附带说明:您可以保留您安排的最后一个任务,并将新任务附加为之前的任务延续。这样你也可以保持秩序。
我认为只保留参数集合会容易得多:
public Task Execution { get; private set; } = StartAsync();
private List<string> _requests = new List<string>();
private string _currentRequest;
private async Task StartAsync()
{
while (true)
{
if (_requests.Count != 0)
{
_currentRequest = _requests[0];
_request.RemoveAt(0);
SomeOtherMethodWithTheActualCode(_currentRequest); // TODO: error handling
_currentRequest = null;
}
await Task.Delay(100);
}
}
protected void Foo(String arg)
{
var index = _requests.IndexOf(arg);
if (index != -1)
_requests.RemoveRange(index, _requests.Count - index);
else if (arg == _currentRequest)
_requests.Clear();
_requests.Add(arg);
}
此代码假定类型是从 UI 线程创建的(并调用了 Foo
)。
我正在编写一种方法来安排一些任务,这些任务需要在创建时以正确的顺序执行,每个任务都有固定的延迟。
基本上,我需要复制这个行为(其中Mx是一个调度任务的方法调用,Tx是它对应的任务):
M1--M2--M3--M4--M5--M6...
------T1--T2--T3--T4--T5...
这里的重点是这个方法被调用得非常频繁(不是我,所以我无法控制)并且我需要能够如果使用两个相互“无效”的参数调用它,则跳过它的执行(例如,如果最后一次调用是 Foo("a"),那么我得到 Foo("b") 和 Foo( “a”),我希望能够跳过最后两个调用,因为它们没用)。这就是我使用 Queue
而不是直接从每个方法调用内部安排任务的原因。
这是我目前的情况:
// Semaphore to synchronize the queue access
private readonly SemaphoreSlim QueueSemaphore = new SemaphoreSlim(1);
// Arguments queue
private readonly Queue<String> History = new Queue<String>();
// The last argument (needed to understand the useless calls)
private String _LastArgument;
protected void Foo(String arg)
{
// Wait and add the argument to the queue
QueueSemaphore.WaitAsync().ContinueWith(ts =>
{
History.Enqueue(arg);
QueueSemaphore.Release();
// Small delay (100ms are enough in this case)
Task.Delay(100).ContinueWith(td =>
{
// Wait again before accessing the queue
QueueSemaphore.WaitAsync().ContinueWith(tf =>
{
// Edge case for the removed calls
if (History.Count == 0)
{
QueueSemaphore.Release();
return;
}
// Get the next argument and test it
String next = History.Dequeue();
if (_LastArgument != null &&
History.Count > 0 &&
History.Peek().Equals(_LastArgument))
{
// Useless calls detected, skip them
StatesHistory.Dequeue();
QueueSemaphore.Release();
return;
}
_LastArgument= next;
SomeOtherMethodWithTheActualCode(next);
QueueSemaphore.Release();
}, TaskContinuationOptions.PreferFairness);
}, TaskContinuationOptions.PreferFairness);
}, TaskContinuationOptions.PreferFairness);
}
现在,我有两个问题:
- 根据我在文档中看到的内容,
TaskContinuationOptions.PreferFairness
标志只会尝试 来保持计划任务的原始顺序,但不保证它们将按相同的顺序执行 - 我多次听说
Task.Delay
方法不可靠,不应将其用于同步目的。因此,例如,如果延迟调用实际上需要 101 毫秒,而另一个延迟调用需要 99 毫秒或 98 毫秒,那可能会把整个事情搞砸。 - 由于此方法将在UI线程上执行,我想避免同步阻塞以避免UI 出现问题
感谢您的帮助!
我不确定我是否理解你的意思,但你可能想尝试异步循环(下面是一些伪代码):
正在向队列中添加项目:
async Task AddToQueueAsync(WorkItem item)
{
await LockQueueAsync();
queue.Add(item);
UnlockQueue();
}
并在无限循环中获取项目:
async Task InfiniteLoopThatExecutesTasksOneByOne()
{
while(true)
{
WorkItem item = null;
await LockQueueAsync();
item = InspectTheQueueAndSkipSomethingIfNeeded();
UnlockQueue();
if(item!=null)
await DispatchItemToUIThread(item);
await Task.Delay(delay);
}
}
有了循环,您的商品将始终被订购。作为一个缺点,您将拥有一些无限工作的代码,因此您需要某种机制在需要时 suspend/resume 它。此外,它不涵盖您的第三个问题,我目前想不出任何方法来异步实现精确延迟。
附带说明:您可以保留您安排的最后一个任务,并将新任务附加为之前的任务延续。这样你也可以保持秩序。
我认为只保留参数集合会容易得多:
public Task Execution { get; private set; } = StartAsync();
private List<string> _requests = new List<string>();
private string _currentRequest;
private async Task StartAsync()
{
while (true)
{
if (_requests.Count != 0)
{
_currentRequest = _requests[0];
_request.RemoveAt(0);
SomeOtherMethodWithTheActualCode(_currentRequest); // TODO: error handling
_currentRequest = null;
}
await Task.Delay(100);
}
}
protected void Foo(String arg)
{
var index = _requests.IndexOf(arg);
if (index != -1)
_requests.RemoveRange(index, _requests.Count - index);
else if (arg == _currentRequest)
_requests.Clear();
_requests.Add(arg);
}
此代码假定类型是从 UI 线程创建的(并调用了 Foo
)。