在 Parallel.Invoke 中更新同一对象的不同属性是线程安全的吗?

Updating different properties of same object in Parallel.Invoke is thread-safe?

我正在使用包含复杂属性的 class。这些属性中的每一个都是通过不同的方法计算的。我正在使用 Parallel.Invoke 来更新同一对象的不同属性。这会对对象造成任何问题吗?

// sample class definition. I've simplified the example by using 'object' type
// for complex types. 
public class TestResult
{
     public object Property1;

     public object Property2;

     public object Property3;
}

// here we populate an object. We are processing it parallelly because each method
// takes some considerable amount of time. 
var testResult = new TestResult();
Parallel.Invoke(
() =>
{
       testResult.Property1 = GetProperty1Value();
},
() =>
{
       testResult.Property2 = GetProperty2Value();
},
() =>
{
       testResult.Property3 = GetProperty3Value();
});

以上代码是否会对 testResult 对象造成任何问题?

注意:我已经测试了这部分代码。似乎不会引起任何问题。据我所知,由于在不同的任务中使用不同的属性,所以这应该不是问题。我找不到关于此的任何文档。我想确认这种行为,因此提出这个问题。

首先应该提到的是,您示例中的 Property1Property2Property3 在技术上称为 fields, not properties

Parallel.Invoke 操作成功完成后,关于 TestResult 实例的完整性,您的示例是完全安全的。 它的所有字段都将被初始化,并且它们的值 will be visible 由当前线程(但在 Parallel.Invoke 完成之前已经 运行 的其他线程不一定可见)。

另一方面,如果 Parallel.Invoke 失败,则 TestResult 实例可能最终被部分初始化。

如果 Property1Property2Property3 实际上是 properties,那么您的代码的线程安全性将取决于代码 运行在这些属性的 set 访问器后面。如果此代码很简单,例如 set { _property1 = value; },那么您的代码同样是安全的。

作为旁注,建议您使用合理的 MaxDegreeOfParallelism 配置 Parallel.Invoke 操作。否则,您将获得 Parallel class 的默认行为,即 saturate the ThreadPool.

TestResult testResult = new();

Parallel.Invoke(new ParallelOptions()
{ MaxDegreeOfParallelism = Environment.ProcessorCount },
    () => testResult.Property1 = GetProperty1Value(),
    () => testResult.Property2 = GetProperty2Value(),
    () => testResult.Property3 = GetProperty3Value()
);

替代方案: 如果您想知道如何在不依赖闭包和副作用的情况下初始化一个 TestResult 实例,这里是 一种方法:

var taskFactory = new TaskFactory(new ConcurrentExclusiveSchedulerPair(
    TaskScheduler.Default, Environment.ProcessorCount).ConcurrentScheduler);

var task1 = taskFactory.StartNew(() => GetProperty1Value());
var task2 = taskFactory.StartNew(() => GetProperty2Value());
var task3 = taskFactory.StartNew(() => GetProperty3Value());

Task.WaitAll(task1, task2, task3);

TestResult testResult = new()
{
    Property1 = task1.Result,
    Property2 = task2.Result,
    Property3 = task3.Result,
};

属性的值暂时存储在各个Task对象中,最后在所有任务完成后,在当前线程上分配给属性。因此,这种方法消除了所有关于构造 TestResult 实例完整性的线程安全考虑。

但有一个缺点:Parallel.Invoke 使用当前线程,并在其上调用一些操作。相反,Task.WaitAll 方法将浪费地阻塞当前线程,让 ThreadPool 完成所有工作。


纯属娱乐:我尝试写了一个ObjectInitializer工具,应该可以并行计算一个对象的属性,然后赋值给每个 属性 顺序(线程安全),无需手动管理一堆分散的 Task 变量。这是我想出的API:

var initializer = new ObjectInitializer<TestResult>();
initializer.Add(() => GetProperty1Value(), (x, v) => x.Property1 = v);
initializer.Add(() => GetProperty2Value(), (x, v) => x.Property2 = v);
initializer.Add(() => GetProperty3Value(), (x, v) => x.Property3 = v);
TestResult testResult = initializer.RunParallel(degreeOfParallelism: 2);

不是很漂亮,但至少很简洁。 Add 方法为一个 属性 添加元数据,RunParallel 进行并行和顺序工作。这是实现:

public class ObjectInitializer<TObject> where TObject : new()
{
    private readonly List<Func<Action<TObject>>> _functions = new();

    public void Add<TProperty>(Func<TProperty> calculate,
        Action<TObject, TProperty> update)
    {
        _functions.Add(() =>
        {
            TProperty value = calculate();
            return source => update(source, value);
        });
    }

    public TObject RunParallel(int degreeOfParallelism)
    {
        TObject instance = new();
        _functions
            .AsParallel()
            .AsOrdered()
            .WithDegreeOfParallelism(degreeOfParallelism)
            .Select(func => func())
            .ToList()
            .ForEach(action => action(instance));
        return instance;
    }
}

它使用 PLINQ 而不是 Parallel class。

我会用吗?可能不会。主要是因为并行初始化一个对象的需要并不经常出现,而且在这种罕见的情况下不得不维护如此晦涩的代码似乎有点过分了。我可能会改用肮脏且有副作用的 Parallel.Invoke 方法。 :-)