并行处理混淆

Parallel processing mix up

我是 C# 编程新手。 我正在尝试使用后台工作人员从服务器列表中获取更新数量。每个服务器的结果都显示在报告进度方法的列表视图中。 我能够使用 foreach 循环成功获得结果,但是在尝试使用并行 foreach 获得相同结果时,列表视图的所有列和行都混淆了。

例如: foreach 循环的输出: 服务器名称状态更新可用

  1. server1 登录服务器失败! 0
  2. server2 更新可用 3
  3. server3 更新可用 3
  4. server4 最新 0 等等..

并行foreach的输出:

  1. server1 更新可用 1
  2. server1 登录服务器失败! 1
  3. server2 登录服务器失败! 0
  4. server3 登录服务器失败! 0
  5. server4 登录服务器失败! 0
  6. server4 更新可用 3 等等..

我尝试过锁定部分代码,也尝试过使用并发包,但未能完全解决问题。下面是 parallelforeach 代码。我做错了什么?任何建议都会有很大帮助。

Parallel.ForEach(namelist, /*new ParallelOptions { MaxDegreeOfParallelism = 4 }, */line =>
//foreach (string line in namelist)
{
      if (worker.CancellationPending)
      {
             e.Cancel = true;
             worker.ReportProgress(SysCount, obj);
      }
      else
      {
             this.SystemName = line;//file.ReadLine();
             Status.sVariables result = new Status.sVariables();
             result = OneSystem(this.SystemName);
             switch (result.BGWResult)
             {
                    case -1:
                       this.StatusString = "Login to server failed!";
                       break;
                    //other status are assigned here;
             }
             SysCount++;
             bag.Add(this);
      }
      Status returnobj;
      bag.TryTake(out returnobj);
      worker.ReportProgress(SysCount, returnobj);
      Thread.Sleep(200);
});

ReportProgress 方法:

private void backgroundWorker1_ProgressChanged(object sender, ProgressChangedEventArgs e)
    {
        if (!backgroundWorker1.CancellationPending)
        {
            Status result = (Status)e.UserState;
            Complete_label.Visible = true;
            if (listView1.InvokeRequired)
                listView1.Invoke(new MethodInvoker(delegate
                {
                    listView1.Items.Add("");
                    listView1.Items[result.SysCount - 1].SubItems.Add(result.SystemName);
                    listView1.Items[result.SysCount - 1].SubItems.Add(result.StatusString);
                    listView1.Items[result.SysCount - 1].SubItems.Add(result.AvailableUpdatesCount.ToString());

                }));
            else
            {
                try
                {
                    listView1.Items.Add("");
                    listView1.Items[result.SysCount - 1].SubItems.Add(result.SystemName);
                    listView1.Items[result.SysCount - 1].SubItems.Add(result.StatusString);
                    listView1.Items[result.SysCount - 1].SubItems.Add(result.AvailableUpdatesCount.ToString());
                }
                catch (Exception ex)
                {}
                //other stuff
           }
    }

你的结果都是混乱的,因为你正在使用并行操作写入全局状态,例如 SystemNameStatusString,因此这些全局变量的内容最终会混乱当您尝试读取和打印它们的值时。

你可以引入一个lock,但这会完全破坏Parallel.ForEach的意义。因此,要么放弃使用 Parallel.ForEach(在这种情况下似乎没有任何用处),要么您需要收集数据并确保以线程安全的方式将其发送给报告者。

为了进一步解释,让我们检查一下代码:

this.SystemName = line; // <- the worker has now written to this, which is global to all workers
...
result = OneSystem(this.SystemName); // <- another worker may have overwritten SystemName at this point
...
                   this.StatusString = "Login to server failed!"; // <- again writing to shared variable
         ...
         bag.Add(this); // <- now trying to "thread protect" already corrupted data

因此,如果您必须 运行 并行循环,则每个工作人员必须只更新自己的独立数据,然后将其推送到 GUI 编组报告方法。

真正的问题是 ListView 更新代码使用了错误的索引来更新项目。它假定 Status.SysCount 属性 包含正确的索引。如果按顺序执行,这可能是正确的,但如果执行并行运行,则会失败 - 不同的线程可以以不同的速度完成并报告乱序的进度。

只需使用 ListViewItemCollection.Add

返回的 ListViewItem 对象即可解决实际问题
private void backgroundWorker1_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
    if (!backgroundWorker1.CancellationPending)
    {
        Status result = (Status)e.UserState;
        Complete_label.Visible = true;
        var newItem=listView1.Items.Add("");
        newItem.SubItems.Add(result.SystemName);
        newItem.SubItems.Add(result.StatusString);
        newItem.SubItems.Add(result.AvailableUpdatesCount.ToString());
        //other stuff
    }
}    

虽然代码有更严重的问题 - State class 尝试并行处理数据,将数据存储在自己的属性中,然后发送自己进行报告。显然,显示的数据将一直在变化。

更好的选择是在循环内创建一个新的 State 实例,或者更好的是,创建一个 class 仅用于报告:

class StatusProgress 
{
    public string SystemName{get;set;}
    public string StatusString{get;set;}
    public int AvailableUpdatesCount {get;set;}
}

....
int sysCount=0;
Parallel.ForEach(namelist, line =>
{
    var progress=new StatusProgress();
    progress.SystemName = line;//file.ReadLine();

    Status.sVariables result = new Status.sVariables();
    result = OneSystem(line);
    switch (result.BGWResult)
    {
        case -1:
            progress.StatusString = "Login to server failed!";
            break;
                //other status are assigned here;
    }
    var count=Interlocked.Increment(ref sysCount);
  }
  worker.ReportProgress(count, progress);
});

请注意,使用 Interlocked.Increment 代替 SysCount++ 来自动增加值 并且 获取增加后的值的副本。如果我不这样做,多个线程可能会在我有机会报告进度之前修改 SysCount

进度报告代码将更改为使用 StateProgress

        StatusProgress result = (StatusProgress)e.UserState;

最后,BackgroundWorker 已过时,因为任务并行库以轻量级得多的方式提供了 BGW 所做的一切,甚至更多。例如,您可以 cancel the parallel loop by using a CancellationToken and report progress in a type-safe manner using the Progress class。

.NET 中的大多数异步方法都可以识别 CancellationToken 和 Progress,这意味着您可以像 shown here 一样轻松地报告进度和取消异步任务。

代码可以这样改写:

在 UI 表格上:

private void ReportServerProgress(StatusProgress result)
{
    Complete_label.Visible = true;
    var newItem=listView1.Items.Add("");
    newItem.SubItems.Add(result.SystemName);
    newItem.SubItems.Add(result.StatusString);
    newItem.SubItems.Add(result.AvailableUpdatesCount.ToString());
    //other stuff
}   

CancellationTokenSource _cts;
Progress<StatusProgress> _progress;

public void StartProcessiong()
{
    _cts=new CancellationTokenSource();
   _progress=new Progress<StatusProgress(progress=>ReportServerProgress(progress);
   StartProcessing(/*input*/,_cts.Token,_progress);
}

public void CancelLoop()
{
   if (_cts!=null)
      _cts.Cancel();
}

处理代码可以在同一个表单上,也可以在任何其他表单上class。事实上,最好将 UI 与处理代码分开,尤其是当您有重要的处理时,例如调用每个服务器以确定其状态

public void StartProcessing(/*input parameters*/,
                    CancellationTokenSource token,
                    IProgress<StatusProgress> progress)
{
    .....
    var po=new ParallelOptions();
    po.CancellationToken=token;
    Parallel.ForEach(namelist, po,line =>
    {
       var status=new StatusProgress();
       status.SystemName = line;//file.ReadLine();
       Status.sVariables result = new Status.sVariables();
       result = OneSystem(line);
       switch (result.BGWResult)
       {
           case -1:
               progress.StatusString = "Login to server failed!";
               break;
               //other status are assigned here;
       }
       progress.Report(status);
    }
}

许多异步 .NET 方法接受取消令牌,因此您可以将其传递给 Web 服务调用,并确保循环和任何未完成的长调用都被取消。