如何同步延迟的计划任务以公平(按顺序)执行它们?

How to synchronize delayed scheduled tasks to execute them fairly (in order)?

我正在编写一种方法来安排一些任务,这些任务需要在创建时以正确的顺序执行,每个任务都有固定的延迟。

基本上,我需要复制这个行为(其中Mx是一个调度任务的方法调用,Tx是它对应的任务):

M1--M2--M3--M4--M5--M6...

------T1--T2--T3--T4--T5...

这里的重点是这个方法被调用得非常频繁(不是我,所以我无法控制)并且我需要能够如果使用两个相互“无效”的参数调用它,则跳过它的执行(例如,如果最后一次调用是 Foo("a"),那么我得到 Foo("b") 和 Foo( “a”),我希望能够跳过最后两个调用,因为它们没用)。这就是我使用 Queue 而不是直接从每个方法调用内部安排任务的原因。

这是我目前的情况:

// Semaphore to synchronize the queue access
private readonly SemaphoreSlim QueueSemaphore = new SemaphoreSlim(1);

// Arguments queue
private readonly Queue<String> History = new Queue<String>();

// The last argument (needed to understand the useless calls)
private String _LastArgument;

protected void Foo(String arg)
{
    // Wait and add the argument to the queue
    QueueSemaphore.WaitAsync().ContinueWith(ts =>
    {
        History.Enqueue(arg);
        QueueSemaphore.Release();

        // Small delay (100ms are enough in this case)
        Task.Delay(100).ContinueWith(td =>
        {
            // Wait again before accessing the queue
            QueueSemaphore.WaitAsync().ContinueWith(tf =>
            {
                // Edge case for the removed calls
                if (History.Count == 0) 
                {
                    QueueSemaphore.Release();
                    return;
                }     

                // Get the next argument and test it               
                String next = History.Dequeue();
                if (_LastArgument != null && 
                    History.Count > 0 && 
                    History.Peek().Equals(_LastArgument))
                {
                    // Useless calls detected, skip them
                    StatesHistory.Dequeue();
                    QueueSemaphore.Release();
                    return;
                }
                _LastArgument= next;
                SomeOtherMethodWithTheActualCode(next);
                QueueSemaphore.Release();
            }, TaskContinuationOptions.PreferFairness);
        }, TaskContinuationOptions.PreferFairness);
    }, TaskContinuationOptions.PreferFairness);
}

现在,我有两个问题:

  1. 根据我在文档中看到的内容,TaskContinuationOptions.PreferFairness 标志只会尝试 来保持计划任务的原始顺序,但不保证它们将按相同的顺序执行
  2. 我多次听说 Task.Delay 方法不可靠,不应将其用于同步目的。因此,例如,如果延迟调用实际上需要 101 毫秒,而另一个延迟调用需要 99 毫秒或 98 毫秒,那可能会把整个事情搞砸。
  3. 由于此方法将在UI线程上执行,我想避免同步阻塞以避免UI
  4. 出现问题

感谢您的帮助!

我不确定我是否理解你的意思,但你可能想尝试异步循环(下面是一些伪代码):

正在向队列中添加项目:

async Task AddToQueueAsync(WorkItem item)
{
    await LockQueueAsync();
    queue.Add(item);
    UnlockQueue();
}

并在无限循环中获取项目:

async Task InfiniteLoopThatExecutesTasksOneByOne()
{
    while(true)
    {
        WorkItem item = null;
        await LockQueueAsync();
        item = InspectTheQueueAndSkipSomethingIfNeeded();
        UnlockQueue();
        if(item!=null)
        await DispatchItemToUIThread(item);
        await Task.Delay(delay);
    }
}

有了循环,您的商品将始终被订购。作为一个缺点,您将拥有一些无限工作的代码,因此您需要某种机制在需要时 suspend/resume 它。此外,它不涵盖您的第三个问题,我目前想不出任何方法来异步实现精确延迟。

附带说明:您可以保留您安排的最后一个任务,并将新任务附加为之前的任务延续。这样你也可以保持秩序。

我认为只保留参数集合会容易得多:

public Task Execution { get; private set; } = StartAsync();

private List<string> _requests = new List<string>();
private string _currentRequest;

private async Task StartAsync()
{
  while (true)
  {
    if (_requests.Count != 0)
    {
      _currentRequest = _requests[0];
      _request.RemoveAt(0);
      SomeOtherMethodWithTheActualCode(_currentRequest); // TODO: error handling
      _currentRequest = null;
    }
    await Task.Delay(100);
  }
}

protected void Foo(String arg)
{
  var index = _requests.IndexOf(arg);
  if (index != -1)
    _requests.RemoveRange(index, _requests.Count - index);
  else if (arg == _currentRequest)
    _requests.Clear();
  _requests.Add(arg);
}

此代码假定类型是从 UI 线程创建的(并调用了 Foo)。