如何取消 N 个任务中的一个特定任务?
How to cancel one particular task out of N number of tasks?
我有 N 个任务有自己的 CancellationTokenSource。为了跟踪每个任务及其 CancellationTokenSource,我使用了 ConcurrentDictionary。因此,每当我需要取消特定任务时,我都会从 ConcurrentDictionary 获取该任务的 CancellationTokenSource 并取消它。
问题是每当我取消一项任务时,所有其他任务也会被取消。有什么我在这里遗漏的或比这更好的实现吗?下面的示例代码。
CancellationTokenSource _serviceCancelSource;
public async Task Start()
{
var services = _serviceRepository.GetAll();
await Task.Run(() =>
{
Parallel.ForEach(services, x => Task.Run(() => Start(x)));
});
}
public async Task Start(Service service)
{
_serviceCancelSource = new CancellationTokenSource();
myConcurrentDictionary.AddOrUpdate(service.Id, _serviceCancelSource, (key, oldValue) => _serviceCancelSource));
var manager = ServiceManagerFactory.Create(service);
Action serviceAction = () => manager.InitializeTaskAsync();
await Task.Run(() => PeriodicTaskFactory.Start(serviceAction, intervalInMilliseconds: service.PollTime, cancelToken: _serviceCancelSource.Token, serviceName: service.ServiceName));
}
public async Task CancelTask()
{
await Task.Run(() =>
{
var serviceId = GetChangedService();
CancellationTokenSource cancelSource;
if (myConcurrentDictionary.TryGetValue(serviceId, out cancelSource))
{
cancelSource.Cancel();
}
});
}
我在代码中看到的一个问题是共享变量“_serviceCancelSource”。
上面代码中的 Parallel.Foreach 将在多个线程上并行调用 "Start(service)" 方法,因此语句“_serviceCancelSource = new CancellationTokenSource();”在所有线程上几乎同时执行。
请记住,“_serviceCancelSource”是一个在所有线程之间共享的变量。
当控件到达 "myConcurrentDictionary.AddOrUpdate()" 时(可能有一些线程安全锁定机制),共享变量“_serviceCancelSource”具有来自最后完成的线程的值。
因此,并发字典最终对所有 "Service.ids" 具有相同的 CancellationTokenSource 实例,这可能是取消一个任务会取消所有其他任务的原因。
如果您将新的取消标记分配给局部变量,然后将其添加到并发字典中,能否请您告诉我们结果。
我有 N 个任务有自己的 CancellationTokenSource。为了跟踪每个任务及其 CancellationTokenSource,我使用了 ConcurrentDictionary。因此,每当我需要取消特定任务时,我都会从 ConcurrentDictionary 获取该任务的 CancellationTokenSource 并取消它。
问题是每当我取消一项任务时,所有其他任务也会被取消。有什么我在这里遗漏的或比这更好的实现吗?下面的示例代码。
CancellationTokenSource _serviceCancelSource;
public async Task Start()
{
var services = _serviceRepository.GetAll();
await Task.Run(() =>
{
Parallel.ForEach(services, x => Task.Run(() => Start(x)));
});
}
public async Task Start(Service service)
{
_serviceCancelSource = new CancellationTokenSource();
myConcurrentDictionary.AddOrUpdate(service.Id, _serviceCancelSource, (key, oldValue) => _serviceCancelSource));
var manager = ServiceManagerFactory.Create(service);
Action serviceAction = () => manager.InitializeTaskAsync();
await Task.Run(() => PeriodicTaskFactory.Start(serviceAction, intervalInMilliseconds: service.PollTime, cancelToken: _serviceCancelSource.Token, serviceName: service.ServiceName));
}
public async Task CancelTask()
{
await Task.Run(() =>
{
var serviceId = GetChangedService();
CancellationTokenSource cancelSource;
if (myConcurrentDictionary.TryGetValue(serviceId, out cancelSource))
{
cancelSource.Cancel();
}
});
}
我在代码中看到的一个问题是共享变量“_serviceCancelSource”。
上面代码中的 Parallel.Foreach 将在多个线程上并行调用 "Start(service)" 方法,因此语句“_serviceCancelSource = new CancellationTokenSource();”在所有线程上几乎同时执行。 请记住,“_serviceCancelSource”是一个在所有线程之间共享的变量。
当控件到达 "myConcurrentDictionary.AddOrUpdate()" 时(可能有一些线程安全锁定机制),共享变量“_serviceCancelSource”具有来自最后完成的线程的值。
因此,并发字典最终对所有 "Service.ids" 具有相同的 CancellationTokenSource 实例,这可能是取消一个任务会取消所有其他任务的原因。
如果您将新的取消标记分配给局部变量,然后将其添加到并发字典中,能否请您告诉我们结果。