LINQ 中的 C# 并行性

C# parallelism in LINQ

我想知道这是否是在下面的代码中计算 x 的安全方法。

public static IEnumerable<Object> Parse(Object obj, ref int x)
{
    x += 10;
    return new List<Object>();
}

public static void Query(List<Object> obj)
{
    int x = 0;

    var result = obj
        .AsParallel()
        .Select(o => Parse(o, ref x))
        .Aggregate((a, b) => a.Concat(b));
}

这是我的代码的简化版本。我希望 x 成为某种 static counter for all parallel executions of Parse。我希望这不会造成混淆。

您的代码存在竞争条件。即使变量 x 是通过引用传递的,它在所有并发执行中仍然是同一个变量,因此向它添加 10 需要是原子的。

解决此问题的一种方法是使用 Interlocked.Add 方法而不是 +=:

public static IEnumerable<Object> Parse(Object obj, ref int x)
{
    Interlocked.Add(ref x, 10);
    return new List<Object>();
}

绝对不安全。 您需要使用 Interlocked class.

public static IEnumerable<Object> Parse(Object obj, ref int x)
{
    Interlocked.Add(ref x, 10);
    return new List<Object>();
}

我会建议一种不同的方法来解决这个问题,正如之前建议的那样,在并行代码中引入同步结构会影响它的工作,如果你仍然需要它,那么你的原始代码需要像 Interlocked / lock 这样的东西来实现它线程安全,但是

更好的方法是每个线程都有一个本地计数器并在最后聚合它,如下所示:

public class MyClass
{
  public int x;
  public object o;
}

public static IEnumerable<MyClass> Parse(Object obj)
{
    MyClass c = new MyClass();
    c.x += 10;
    c.o  = <some new object>
    // Add c to instance of List<MyClass>
    return new List<MyClass>();
}

public static void Query(List<Object> obj)
{          
    var result = obj
        .AsParallel()
        .Select(o => Parse(o))

   // result is of type IEnumerable<MyClass>

   var sum = result.Sum(a=>a.x);

   var aggregate = result.Aggregate((a, b) => a.o.Concat(b.o));
}

这是一个无锁/同步的解决方案,没有性能影响,也没有竞争条件。始终用于线程,尝试使线程局部化,然后为每个单独的线程变量应用类似 sum 的函数。