如何重新启动异步方法?取消之前的运行,等待再启动

How to restart a async method? Cancel previous run, await it and then start it

我有一个方法 RestartAsync,它启动了一个方法 DoSomethingAsync。当再次调用 RestartAsync 时,它应该取消 DoSomethingAsync 并等待直到它完成(DoSomethingAsync 不能同步取消,并且它不应该在前一个任务仍在进行时调用)。

我的第一个方法是这样的:

public async Task RestartTest()
{
    Task[] allTasks = { RestartAsync(), RestartAsync(), RestartAsync() } ;
    await Task.WhenAll(allTasks);
}

private async Task RestartAsync()
{
    _cts.Cancel();
    _cts = new CancellationTokenSource();
    await _somethingIsRunningTask;

    _somethingIsRunningTask = DoSomethingAsync(_cts.Token);

    await _somethingIsRunningTask;
}

private static int _numberOfStarts;

private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
    _numberOfStarts++;
    int numberOfStarts = _numberOfStarts;

    try
    {
        Console.WriteLine(numberOfStarts + " Start to do something...");
        await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        Console.WriteLine(numberOfStarts + " Finished to do something...");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine(numberOfStarts + " Cancelled to do something...");
    }
}

三次调用 RestartAsync 时的实际输出如下所示(注意第二次 运行 正在取消并等待第一次,但同时第三次 运行 也在等待第一次而不是取消并等待第二个):

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
3 Start to do something...
2 Finished to do something...
3 Finished to do something...

但是我想要实现的是这个输出:

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Cancelled to do something...
3 Start to do something...
3 Finished to do something...

我目前的解决方案如下:

private async Task RestartAsync()
{
    if (_isRestarting)
    {
        return;
    }

    _cts.Cancel();
    _cts = new CancellationTokenSource();

    _isRestarting = true;
    await _somethingIsRunningTask;
    _isRestarting = false;

    _somethingIsRunningTask = DoSomethingAsync(_cts.Token);

    await _somethingIsRunningTask;
}

然后我得到这个输出:

1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Finished to do something...

现在至少 DoSomethingAsync 还没有开始,而它还在进行中(注意第三个 运行 被忽略了,这并不重要,因为它应该取消第二个 运行 否则)。

但是这个解决方案感觉不太好,我必须在任何需要这种行为的地方重复这个丑陋的模式。 这种重启机制有什么好的模式或框架吗?

这是一个并发问题。因此,您需要一个解决并发问题的方法:信号量。

在一般情况下,您还应该考虑正在运行的方法何时抛出 OperationCanceledException:

private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
    _numberOfStarts++;
    int numberOfStarts = _numberOfStarts;

    try
    {
        Console.WriteLine(numberOfStarts + " Start to do something...");
        await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        Console.WriteLine(numberOfStarts + " Finished to do something...");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine(numberOfStarts + " Cancelled to do something...");
        throw;
    }
}

试试这个:

private SemaphoreSlim semaphore = new SemaphoreSlim(1);
private (CancellationTokenSource cts, Task task)? state;

private async Task RestartAsync()
{
    Task task = null;

    await this.semaphore.WaitAsync();

    try
    {
        if (this.state.HasValue)
        {
            this.state.Value.cts.Cancel();
            this.state.Value.cts.Dispose();

            try
            {
                await this.state.Value.task;
            }
            catch (OperationCanceledException)
            {
            }

            this.state = null;
        }

        var cts = new CancellationTokenSource();
        task = DoSomethingAsync(cts.Token);

        this.state = (cts, task);
    }
    finally
    {
        this.semaphore.Release();
    }

    try
    {
        await task;
    }
    catch (OperationCanceledException)
    {
    }
}

我认为问题出在 RestartAsync 方法内部。请注意,异步方法将立即 return 一个任务,如果它要等待某事,因此第二个 RestartAsync 实际上 return 在它交换其任务之前,然后第三个 RestartAsync 进入并首先等待任务 RestartAsync。

此外,如果 RestartAsync 将由多线程执行,您可能希望将 _cts 和 _somethingIsRunningTask 包装成一个并使用 Interlocked.Exchange 方法交换值以防止竞争条件。

这是我的示例代码,未经过全面测试:

public class Program
{
    static async Task Main(string[] args)
    {
        RestartTaskDemo restartTaskDemo = new RestartTaskDemo();

        Task[] tasks = { restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ) };
        await Task.WhenAll( tasks );

        Console.ReadLine();
    }
}

public class RestartTaskDemo
{
    private int Counter = 0;

    private TaskEntry PreviousTask = new TaskEntry( Task.CompletedTask, new CancellationTokenSource() );

    public async Task RestartAsync( int delay )
    {            
        TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        TaskEntry previousTaskEntry = Interlocked.Exchange( ref PreviousTask, new TaskEntry( taskCompletionSource.Task, cancellationTokenSource ) );

        previousTaskEntry.CancellationTokenSource.Cancel();
        await previousTaskEntry.Task.ContinueWith( Continue );

        async Task Continue( Task previousTask )
        {
            try
            {
                await DoworkAsync( delay, cancellationTokenSource.Token );
                taskCompletionSource.TrySetResult( true );
            }
            catch( TaskCanceledException )
            {
                taskCompletionSource.TrySetCanceled();
            }
        }            
    }

    private async Task DoworkAsync( int delay, CancellationToken cancellationToken )
    {
        int count = Interlocked.Increment( ref Counter );
        Console.WriteLine( $"Task {count} started." );

        try
        {
            await Task.Delay( delay, cancellationToken );
            Console.WriteLine( $"Task {count} finished." );
        }
        catch( TaskCanceledException )
        {
            Console.WriteLine( $"Task {count} cancelled." );
            throw;
        }
    }

    private class TaskEntry
    {
        public Task Task { get; }

        public CancellationTokenSource CancellationTokenSource { get; }

        public TaskEntry( Task task, CancellationTokenSource cancellationTokenSource )
        {
            Task = task;
            CancellationTokenSource = cancellationTokenSource;
        }
    }
}