Parallel.ForEach 如何处理取消或 ThrowIfCancellationRequested() 和异常

How does Parallel.ForEach handles cancellation or ThrowIfCancellationRequested() and exceptions

我创建了一个 WPF 应用程序来查看 TPL 的工作原理,但我对我的输出感到困惑。下面是我的代码:

// Two buttons, 'Process' button and 'Cancel' button
public partial class MainWindow : Window 
{
   private CancellationTokenSource cancelToken = new CancellationTokenSource();
   public MainWindow()
   {
      InitializeComponent();
   }
   //...

   private void cmdProcess_Click(object sender, EventArgs e)  // Sequence A
   {
      Task.Factory.StartNew(() => ProcessFiles()); 
   }

    private void cmdCancel_Click(object sender, EventArgs e)   //Sequence B
   {
      cancelToken.Cancel();  
   }

   private void ProcessFiles() 
   {
      ParallelOptions parOpts = new ParallelOptions();
      parOpts.CancellationToken = cancelToken.Token;
      parOpts.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

      string[] files = { "first", "second" };
      try
      {
         Parallel.ForEach(files, parOpts, currentFile =>
         {
            parOpts.CancellationToken.ThrowIfCancellationRequested();  //Sequence C
            Thread.Sleep(5000);
         });
      }
      catch (OperationCanceledException ex)
      { 
         MessageBox.Show("Caught");
      }
   }

}

当我按下 click 按钮然后快速按下 cancel 按钮时,"Caught" 消息框只弹出一次,而不是两次。

假设主线程id是1,工作线程是2和3 所以我有两个问题:

Q1-当我按下cancel按钮时,工作线程2和3已经执行了'parOpts.CancellationToken.ThrowIfCancellationRequested();'(当然我的鼠标点击不能和线程的执行速度一样快)。当他们执行ThrowIfCancellationRequested的时候,cancelToken还没有被取消,也就是说线程2和线程3的取消按钮没有被点击。那么为什么那些工作线程仍然抛出异常?

Q2-为什么我只得到一个弹出消息框,不应该是两个,一个线程2,一个线程3?

Q3- 我将 Parallel.ForEach 修改为:

try
{
   Parallel.ForEach(files, parOpts, currentFile =>
   {
      Thread.Sleep(5000);
      parOpts.CancellationToken.ThrowIfCancellationRequested(); 

   });
}
catch (OperationCanceledException ex)
{ 
   MessageBox.Show("Caught");
}

现在我可以在工作线程到达 ThrowIfCancellationRequested() 之前按下取消按钮,但我仍然只收到主线程抛出的一个异常。但是我按下了cancal按钮,token已经被设置为cancel了,所以当secondary worker thread到达parOpts.CancellationToken.ThrowIfCancellationRequested();的时候,它不应该也抛出异常吗?并且这个异常不能被主线程中的 try catch 处理(每个线程都有自己的堆栈),所以我应该得到一个未处理的异常来停止应用程序,但事实并非如此,我只是得到一个主线程抛出的异常,这个异常是主线程还是工作线程抛出的?

Q4-我修改代码为:

private void ProcessFilesz()
{
    ParallelOptions parOpts = new ParallelOptions();
    parOpts.CancellationToken = cancelToken.Token;
    parOpts.MaxDegreeOfParallelism = System.Environment.ProcessorCount;

    cancelToken.Cancel(); // cancel here
    string[] files = { "first", "second" };
    try
    {
        Parallel.ForEach(files, parOpts, currentFile =>
        {
            MessageBox.Show("Underline Thread is " + Thread.CurrentThread.ManagedThreadId.ToString());
            parOpts.CancellationToken.ThrowIfCancellationRequested();
        });

    }
    catch (OperationCanceledException ex)
    {
        MessageBox.Show("catch");
    }
}

又奇怪了,没有弹出消息框,即使token设置为cancel,但是MessageBox.Show(...)语句在parOpts.CancellationToken.ThrowIfCancellationRequested();语句之上,所以MessageBox.Show() 应该先执行,但为什么根本不执行呢?或 CLR 将 parOpts.CancellationToken.ThrowIfCancellationRequested(); 提升到顶部以隐式地成为第一个语句?

Q5- 我将代码修改为:

try
{
   Parallel.ForEach(files, parOpts, currentFile =>
   {
      Thread.Sleep(5000); // I pressed the cancel button on the main thread when the worker thread is sleeping
   });
}
catch (OperationCanceledException ex)
{
   MessageBox.Show("Caught");
}

所以我有足够的时间按下取消按钮,有一条"catch"消息,但为什么仍然有异常?现在我明白 Parallel.ForEach 在所有耗费资源的操作之前检查 CancellationToken.IsCancellationRequested`,这是否意味着 Parallel.ForEach 将在执行完所有语句后检查 IsCancellationRequested?我的意思是 Parallel.ForEach 将检查 IsCancellationRequested 两次,一次在第一个语句之前,一次在最后一个语句之后?

Parallel.ForEach 如何处理取消##

您的观察是正确的。但一切都表现正常。由于 ParallelOptions.CancellationToken 属性 已设置,一旦 CancellationToken.IsCancellationRequested 计算结果为真,Parallel.ForEach 将抛出 OperationCanceledException

所有支持取消的框架 类 都是这样的(例如 Task.Run)。在执行任何昂贵的资源分配(内存或时间昂贵)之前,框架会在执行期间多次检查取消标记以提高效率。 Parallel.ForEach 例如由于所有线程管理,必须进行许多这种昂贵的资源分配。在 each 分配步骤(例如初始化、生成工作线程或分叉、应用分区程序、调用操作等)之前,再次评估 CancellationToken.IsCancelRequested

最后一个内部 Parallel.ForEach 步骤是在创建 ParallelLoopResultParallel.ForEach 的 return 值)之前加入线程。在此操作之前,将再次评估 CancellationToken.IsCancellationRequested。由于您在执行 Thread.Sleep(5000) 时取消了 Parallel.ForEach 的执行,因此您必须等待最长 5 秒的时间,直到框架重新检查此 属性 并可以抛出 OperationCanceledException.你可以测试一下。 Thread.Sleep(x) 需要 x/1000 秒才能显示 MessageBox

另一个取消 Parallel.ForEach 的机会委托给消费者。很可能消费者的动作很长运行,因此需要在Parallel.ForEach结束前取消。如您所知,可以通过(重复)调用 CancellationToken.ThrowIfCancellationRequested() 强制提前取消,这一次将使 CancellationToken 抛出 OperationCanceledException(而不是 Parallel.ForEach)。

回答你的最后一个问题,为什么你只会看到 一个 MessageBox:在你的特殊情况下,你已经注意到,你点击取消按钮的速度太慢了在代码到达 CancellationToken.ThrowIfCancellationRequested() 之前,但能够在线程从睡眠中唤醒之前单击它。因此 Parallel.ForEach 抛出异常(在加入线程和创建 ParallelLoopResult 之前)。所以抛出 one 异常。但是,即使您足够快地在到达 CancellationToken.ThrowIfCancellationRequested() 之前取消循环,仍然只有一个 MessageBox 显示,因为一旦抛出未捕获的异常,循环就会中止所有线程。要允许每个线程抛出异常,您必须捕获每个异常并累积它们,然后再将它们包裹在 AggregateException 中。请参阅:Microsoft Docs: How to Handle Exceptions in Parallel Loops 了解更多详情。


编辑以回答 follow-up 问题:

For Q2, I just realized each thread has its own stack, so it won't know that it is surrounded by a try catch block, that's why there is only one exception(thrown by primary thread), is my understanding correct?

您说每个线程都有其专用的调用堆栈是对的。但是,当您编写应该并发执行的代码时,会在堆上为每个线程创建所有局部变量的副本。 try-catch 块也是如此。 Catch 指示编译器定义一个处理程序(指令指针),然后通过 try 指令将其注册到异常处理程序 table。 table 由 OS 管理。异常 table 将每个处理程序映射到一个异常。每个异常都映射到一个调用堆栈。因此异常和捕获处理程序仅限于显式调用堆栈。由于处理程序可以访问线程本地内存,因此它也必须是一个副本。这意味着每个线程都是其 catch 个处理程序的 'aware' 个。

由于专用的调用堆栈以及异常到调用堆栈和捕获处理程序到异常(因此也到调用堆栈)的独占映射,线程范围(调用堆栈)中抛出的任何异常都不能超出线程范围(使用 Thread 时)。在这种情况下,范围意味着它由调用堆栈(及其调用帧)描述的地址 space。除非不直接在线程本身中捕获,否则会使应用程序崩溃。 Task(当使用 Task.Waitawait 等待时)相反,吞下所有异常并将它们包装在 AggregateException.

DoParallel()抛出的异常不会被捕获:

try 
{
  Thread thread = new Thread(() => DoParallel());
  thread.Start();
}
catch (Exception ex)
{
  // Unreachable code
}

但是在下面的两个例子中,两个catch处理程序都被调用来处理异常:

try 
{
  await Task.Run(() => DoParallel());
}
catch (AggregateException ex)
{
  // Reachable code
}

try 
{
  var task = new Task(() => DoParallel());
  task.Start();
  task.Wait();
}
catch (AggregateException ex)
{
  // Reachable code
}

最后两个示例使用 Task Parallel Library - TPL,它使用 SynchronizationContext 允许线程共享上下文,因此例如在线程之间传播异常。由于 Parallel.ForEach 使用 Task.Wait() (TPL),它能够捕获工作线程的异常(如果您尚未在操作中捕获它),以执行一些清理(取消其他工作线程和处理内部资源),然后最终将 OperationCanceledException 传播到外部范围。

所以因为抛出异常,

  • OS 中断应用程序并检查异常 table 以查找由 try 指令映射到此线程的潜在处理程序。
  • 它找到一个并重建上下文以执行 catch 处理程序(在您的例子中,下一个 catch 处理程序是 Parallel.ForEach 的内部处理程序)。应用程序仍处于暂停状态 - 其他 线还在停车。
  • Parallel.ForEach 处理程序执行清理并结束其他处理程序 threads before 应用程序继续,因此 before 任何工作线程 可以自己抛出额外的异常。
  • 应用程序继续执行 Parallel.ForEach catch 的重新 throw 处理程序。
  • 应用程序再次停止寻找外部作用域(Parallel.ForEach 的消费者作用域) catch 处理程序。
  • 如果 none 是使用 try 注册的,应用程序将因错误而终止。

这就是为什么 Parallel.ForEach.

总是抛出 一个 异常的原因

编辑以回答 follow-up 问题 Q3:

now I can press cancel button before worker thread reaches to ThrowIfCancellationRequested(), but I still get only one exception thrown by the primary thread. BUt I pressed the cancal button, token has been set to cancel, so when the secondary worker thread reaches to parOpts.CancellationToken.ThrowIfCancellationRequested();, shouldn't it throw an exception too? and this exception cannot be handled by the try catch in the primary thread(each thread has its own stack), so I should get an unhandled exception to halt the application, but it wasn't, I just get one exception thrown by primary thread, and is this exception thrown by primary thread or worker thread

对于以下场景:

try
{
   Parallel.ForEach(files, parOpts, currentFile =>
   {
      Thread.Sleep(5000);
      parOpts.CancellationToken.ThrowIfCancellationRequested(); 

   });
}
catch (OperationCanceledException ex)
{ 
   MessageBox.Show("Caught");
}

由于在这种情况下您可以在 Parallel.ForEach 完成之前取消它,因此在工作线程(执行您的操作委托)上会生成异常,在执行 CancellationToken.ThrowIfCancellationRequested() 的那一刻。在引擎盖下,CancellationToken.ThrowIfCancellationRequested() 方法看起来像:

public void ThrowIfCancellationRequested()
{
  if (IsCancellationRequested) 
    ThrowOperationCanceledException();
}

// Throws an OCE; separated out to enable better inlining of ThrowIfCancellationRequested
private void ThrowOperationCanceledException()
{
  throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this);
}

正如我之前提到的,Parallel.ForEach 使用 TaskTask.Wait() (_TPL_) 来处理线程,因此使用 SynchronizationContext。在TPL(或SynchronizationContext)的场景中,线程上下文是共享的,不再是孤立的(与Thread线程相反)。这允许 Parallel.ForEach 捕获子线程抛出的异常。

这意味着,Parallel.ForEach 中没有未处理的异常,因为正如您在异常流程的 step-by-step 解释中所读到的,Parallel.ForEach 在内部捕获了所有异常(由于 TPL) 可能会清理和处理分配的资源,然后最终 re-throw OperationCanceledException.

当检查您的 Q3 代码示例的异常调用堆栈时,您会看到起源是工作线程而不是 'primary' Parallel.ForEach 线程。您刚刚在主线程中捕获了异常,因为它包含最接近原点的 catch 处理程序 - 工作线程。因此,主线程可以在不取消的情况下完成。


Parallel.ForEach 和线程

我认为你的理解是错误的:

...the primary thread is also executing the statements in Parallel.ForEach, isn't it? I have a typo in the post, there is only two active threads, not three. the string[] just have two elements, so primary thread takes "first" to process and one worker thread takes "two" to process...

这不是真的。明确地说:您初始示例中的数组包含两个应该模拟工作负载的字符串,对吗?主线程是您为使用 Task.Factory.StartNew(() => ProcessFiles()); 执行 Parallel.ForEach 循环而创建的线程。这是为了在长时间 运行 Parallel.ForEach 期间保持 UI 线程响应的常见做法。 Parallel.ForEach 因此在主线程上执行,并且 可能 创建两个工作线程 - 每个负载(或字符串)一个。 可能 因为 Parallel.ForEach 实际上使用 任务 ,它们由 线程 支持。最大 线程 数量受处理器数量和 TaskScheduler 限制。由于框架执行的性能优化,tasks 的实际数量不能匹配迭代项的数量或 MaxDegreeOfParallelism.

的值

The Parallel.ForEach method may use more tasks than threads over the lifetime of its execution, as existing tasks complete and are replaced by new tasks. This gives the underlying TaskScheduler object the chance to add, change, or remove threads that service the loop. may decide to execute the action delegates on fewer threads then the MaxDegreeOfParallelism allows. (source: Microsoft Docs: Parallel.ForEach)


归纳总结

假设设置了ParallelOptions.CancellationToken属性,有两种可能的情况:

第一种情况:您确实在请求取消后在您的操作委托中调用了CancellationToken.ThrowIfCancellationRequested(),但是before Parallel.ForEach 内部计算 CancellationToken.IsCancellationRequested。现在,如果您用 try-catch 包围您的操作代码,则不会有异常离开工作线程。如果没有这样的 try-catchParallel.ForEach 将在内部捕获此异常(进行一些清理)。这将在主线程上。 re-thrown 在 Parallel.ForEach 处理分配的资源后出现此异常。因为你在worker上调用了CancellationToken.ThrowIfCancellationRequested(),所以origin还是这个worker线程。除了取消请求,任何异常都可以随时停止 Parallel.ForEach 的执行。

第二种情况:没有在您的操作委托中显式调用CancellationToken.ThrowIfCancellationRequested()或发生取消调用CancellationToken.ThrowIfCancellationRequested()方法后,下一次Parallel.ForEach内部检查CancellationToken.IsCancelRequested时,Parallel.ForEach将抛出异常。 Parallel.ForEach 总是在分配任何资源之前评估 CancellationToken.IsCancelRequested。由于 Parallel.ForEach 在主线程上执行,因此此异常的来源当然是主线程。除了取消请求,任何异常都可以随时停止 Parallel.ForEach 的执行。

如果未设置 ParallelOptions.CancellationToken 属性,则不会发生 CancellationToken.IsCancelRequested 的内部 Parallel.ForEach 计算。在 CancellationToken.Cancel() 请求的情况下,Parallel.ForEach 无法做出反应并将继续其资源密集型工作,除非 调用 [=] 不会抛出异常34=]。除了取消请求,任何异常都可以随时停止 Parallel.ForEach 的执行。