如何重新启动异步方法?取消之前的运行,等待再启动
How to restart a async method? Cancel previous run, await it and then start it
我有一个方法 RestartAsync
,它启动了一个方法 DoSomethingAsync
。当再次调用 RestartAsync
时,它应该取消 DoSomethingAsync
并等待直到它完成(DoSomethingAsync
不能同步取消,并且它不应该在前一个任务仍在进行时调用)。
我的第一个方法是这样的:
public async Task RestartTest()
{
Task[] allTasks = { RestartAsync(), RestartAsync(), RestartAsync() } ;
await Task.WhenAll(allTasks);
}
private async Task RestartAsync()
{
_cts.Cancel();
_cts = new CancellationTokenSource();
await _somethingIsRunningTask;
_somethingIsRunningTask = DoSomethingAsync(_cts.Token);
await _somethingIsRunningTask;
}
private static int _numberOfStarts;
private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
_numberOfStarts++;
int numberOfStarts = _numberOfStarts;
try
{
Console.WriteLine(numberOfStarts + " Start to do something...");
await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
Console.WriteLine(numberOfStarts + " Finished to do something...");
}
catch (OperationCanceledException)
{
Console.WriteLine(numberOfStarts + " Cancelled to do something...");
}
}
三次调用 RestartAsync 时的实际输出如下所示(注意第二次 运行 正在取消并等待第一次,但同时第三次 运行 也在等待第一次而不是取消并等待第二个):
1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
3 Start to do something...
2 Finished to do something...
3 Finished to do something...
但是我想要实现的是这个输出:
1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Cancelled to do something...
3 Start to do something...
3 Finished to do something...
我目前的解决方案如下:
private async Task RestartAsync()
{
if (_isRestarting)
{
return;
}
_cts.Cancel();
_cts = new CancellationTokenSource();
_isRestarting = true;
await _somethingIsRunningTask;
_isRestarting = false;
_somethingIsRunningTask = DoSomethingAsync(_cts.Token);
await _somethingIsRunningTask;
}
然后我得到这个输出:
1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Finished to do something...
现在至少 DoSomethingAsync
还没有开始,而它还在进行中(注意第三个 运行 被忽略了,这并不重要,因为它应该取消第二个 运行 否则)。
但是这个解决方案感觉不太好,我必须在任何需要这种行为的地方重复这个丑陋的模式。 这种重启机制有什么好的模式或框架吗?
这是一个并发问题。因此,您需要一个解决并发问题的方法:信号量。
在一般情况下,您还应该考虑正在运行的方法何时抛出 OperationCanceledException
:
private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
_numberOfStarts++;
int numberOfStarts = _numberOfStarts;
try
{
Console.WriteLine(numberOfStarts + " Start to do something...");
await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
Console.WriteLine(numberOfStarts + " Finished to do something...");
}
catch (OperationCanceledException)
{
Console.WriteLine(numberOfStarts + " Cancelled to do something...");
throw;
}
}
试试这个:
private SemaphoreSlim semaphore = new SemaphoreSlim(1);
private (CancellationTokenSource cts, Task task)? state;
private async Task RestartAsync()
{
Task task = null;
await this.semaphore.WaitAsync();
try
{
if (this.state.HasValue)
{
this.state.Value.cts.Cancel();
this.state.Value.cts.Dispose();
try
{
await this.state.Value.task;
}
catch (OperationCanceledException)
{
}
this.state = null;
}
var cts = new CancellationTokenSource();
task = DoSomethingAsync(cts.Token);
this.state = (cts, task);
}
finally
{
this.semaphore.Release();
}
try
{
await task;
}
catch (OperationCanceledException)
{
}
}
我认为问题出在 RestartAsync 方法内部。请注意,异步方法将立即 return 一个任务,如果它要等待某事,因此第二个 RestartAsync 实际上 return 在它交换其任务之前,然后第三个 RestartAsync 进入并首先等待任务 RestartAsync。
此外,如果 RestartAsync 将由多线程执行,您可能希望将 _cts 和 _somethingIsRunningTask 包装成一个并使用 Interlocked.Exchange 方法交换值以防止竞争条件。
这是我的示例代码,未经过全面测试:
public class Program
{
static async Task Main(string[] args)
{
RestartTaskDemo restartTaskDemo = new RestartTaskDemo();
Task[] tasks = { restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ) };
await Task.WhenAll( tasks );
Console.ReadLine();
}
}
public class RestartTaskDemo
{
private int Counter = 0;
private TaskEntry PreviousTask = new TaskEntry( Task.CompletedTask, new CancellationTokenSource() );
public async Task RestartAsync( int delay )
{
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
TaskEntry previousTaskEntry = Interlocked.Exchange( ref PreviousTask, new TaskEntry( taskCompletionSource.Task, cancellationTokenSource ) );
previousTaskEntry.CancellationTokenSource.Cancel();
await previousTaskEntry.Task.ContinueWith( Continue );
async Task Continue( Task previousTask )
{
try
{
await DoworkAsync( delay, cancellationTokenSource.Token );
taskCompletionSource.TrySetResult( true );
}
catch( TaskCanceledException )
{
taskCompletionSource.TrySetCanceled();
}
}
}
private async Task DoworkAsync( int delay, CancellationToken cancellationToken )
{
int count = Interlocked.Increment( ref Counter );
Console.WriteLine( $"Task {count} started." );
try
{
await Task.Delay( delay, cancellationToken );
Console.WriteLine( $"Task {count} finished." );
}
catch( TaskCanceledException )
{
Console.WriteLine( $"Task {count} cancelled." );
throw;
}
}
private class TaskEntry
{
public Task Task { get; }
public CancellationTokenSource CancellationTokenSource { get; }
public TaskEntry( Task task, CancellationTokenSource cancellationTokenSource )
{
Task = task;
CancellationTokenSource = cancellationTokenSource;
}
}
}
我有一个方法 RestartAsync
,它启动了一个方法 DoSomethingAsync
。当再次调用 RestartAsync
时,它应该取消 DoSomethingAsync
并等待直到它完成(DoSomethingAsync
不能同步取消,并且它不应该在前一个任务仍在进行时调用)。
我的第一个方法是这样的:
public async Task RestartTest()
{
Task[] allTasks = { RestartAsync(), RestartAsync(), RestartAsync() } ;
await Task.WhenAll(allTasks);
}
private async Task RestartAsync()
{
_cts.Cancel();
_cts = new CancellationTokenSource();
await _somethingIsRunningTask;
_somethingIsRunningTask = DoSomethingAsync(_cts.Token);
await _somethingIsRunningTask;
}
private static int _numberOfStarts;
private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
_numberOfStarts++;
int numberOfStarts = _numberOfStarts;
try
{
Console.WriteLine(numberOfStarts + " Start to do something...");
await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
Console.WriteLine(numberOfStarts + " Finished to do something...");
}
catch (OperationCanceledException)
{
Console.WriteLine(numberOfStarts + " Cancelled to do something...");
}
}
三次调用 RestartAsync 时的实际输出如下所示(注意第二次 运行 正在取消并等待第一次,但同时第三次 运行 也在等待第一次而不是取消并等待第二个):
1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
3 Start to do something...
2 Finished to do something...
3 Finished to do something...
但是我想要实现的是这个输出:
1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Cancelled to do something...
3 Start to do something...
3 Finished to do something...
我目前的解决方案如下:
private async Task RestartAsync()
{
if (_isRestarting)
{
return;
}
_cts.Cancel();
_cts = new CancellationTokenSource();
_isRestarting = true;
await _somethingIsRunningTask;
_isRestarting = false;
_somethingIsRunningTask = DoSomethingAsync(_cts.Token);
await _somethingIsRunningTask;
}
然后我得到这个输出:
1 Start to do something...
1 Cancelled to do something...
2 Start to do something...
2 Finished to do something...
现在至少 DoSomethingAsync
还没有开始,而它还在进行中(注意第三个 运行 被忽略了,这并不重要,因为它应该取消第二个 运行 否则)。
但是这个解决方案感觉不太好,我必须在任何需要这种行为的地方重复这个丑陋的模式。 这种重启机制有什么好的模式或框架吗?
这是一个并发问题。因此,您需要一个解决并发问题的方法:信号量。
在一般情况下,您还应该考虑正在运行的方法何时抛出 OperationCanceledException
:
private async Task DoSomethingAsync(CancellationToken cancellationToken)
{
_numberOfStarts++;
int numberOfStarts = _numberOfStarts;
try
{
Console.WriteLine(numberOfStarts + " Start to do something...");
await Task.Delay(TimeSpan.FromSeconds(1)); // This operation can not be cancelled.
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
Console.WriteLine(numberOfStarts + " Finished to do something...");
}
catch (OperationCanceledException)
{
Console.WriteLine(numberOfStarts + " Cancelled to do something...");
throw;
}
}
试试这个:
private SemaphoreSlim semaphore = new SemaphoreSlim(1);
private (CancellationTokenSource cts, Task task)? state;
private async Task RestartAsync()
{
Task task = null;
await this.semaphore.WaitAsync();
try
{
if (this.state.HasValue)
{
this.state.Value.cts.Cancel();
this.state.Value.cts.Dispose();
try
{
await this.state.Value.task;
}
catch (OperationCanceledException)
{
}
this.state = null;
}
var cts = new CancellationTokenSource();
task = DoSomethingAsync(cts.Token);
this.state = (cts, task);
}
finally
{
this.semaphore.Release();
}
try
{
await task;
}
catch (OperationCanceledException)
{
}
}
我认为问题出在 RestartAsync 方法内部。请注意,异步方法将立即 return 一个任务,如果它要等待某事,因此第二个 RestartAsync 实际上 return 在它交换其任务之前,然后第三个 RestartAsync 进入并首先等待任务 RestartAsync。
此外,如果 RestartAsync 将由多线程执行,您可能希望将 _cts 和 _somethingIsRunningTask 包装成一个并使用 Interlocked.Exchange 方法交换值以防止竞争条件。
这是我的示例代码,未经过全面测试:
public class Program
{
static async Task Main(string[] args)
{
RestartTaskDemo restartTaskDemo = new RestartTaskDemo();
Task[] tasks = { restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ), restartTaskDemo.RestartAsync( 1000 ) };
await Task.WhenAll( tasks );
Console.ReadLine();
}
}
public class RestartTaskDemo
{
private int Counter = 0;
private TaskEntry PreviousTask = new TaskEntry( Task.CompletedTask, new CancellationTokenSource() );
public async Task RestartAsync( int delay )
{
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
TaskEntry previousTaskEntry = Interlocked.Exchange( ref PreviousTask, new TaskEntry( taskCompletionSource.Task, cancellationTokenSource ) );
previousTaskEntry.CancellationTokenSource.Cancel();
await previousTaskEntry.Task.ContinueWith( Continue );
async Task Continue( Task previousTask )
{
try
{
await DoworkAsync( delay, cancellationTokenSource.Token );
taskCompletionSource.TrySetResult( true );
}
catch( TaskCanceledException )
{
taskCompletionSource.TrySetCanceled();
}
}
}
private async Task DoworkAsync( int delay, CancellationToken cancellationToken )
{
int count = Interlocked.Increment( ref Counter );
Console.WriteLine( $"Task {count} started." );
try
{
await Task.Delay( delay, cancellationToken );
Console.WriteLine( $"Task {count} finished." );
}
catch( TaskCanceledException )
{
Console.WriteLine( $"Task {count} cancelled." );
throw;
}
}
private class TaskEntry
{
public Task Task { get; }
public CancellationTokenSource CancellationTokenSource { get; }
public TaskEntry( Task task, CancellationTokenSource cancellationTokenSource )
{
Task = task;
CancellationTokenSource = cancellationTokenSource;
}
}
}