使用 Linq 和集合上的任务实现线程安全
Thread safe with Linq and Tasks on a Collection
给定一些这样的代码
public class CustomCollectionClass : Collection<CustomData> {}
public class CustomData
{
string name;
bool finished;
string result;
}
public async Task DoWorkInParallel(CustomCollectionClass collection)
{
// collection can be retrieved from a DB, may not exist.
if (collection == null)
{
collection = new CustomCollectionClass();
foreach (var data in myData)
{
collection.Add(new CustomData()
{
name = data.Name;
});
}
}
// This part doesn't feel safe. Not sure what to do here.
var processTasks = myData.Select(o =>
this.DoWorkOnItemInCollection(collection.Single(d => d.name = o.Name))).ToArray();
await Task.WhenAll(processTasks);
await SaveModifedCollection(collection);
}
public async Task DoWorkOnItemInCollection(CustomData data)
{
await DoABunchOfWorkElsewhere();
// This doesn't feel safe either. Lock here?
data.finished = true;
data.result = "Parallel";
}
正如我在内联的几条评论中指出的那样,我觉得执行上述操作并不安全,但我不确定。我确实有一个元素集合,我想为每个并行任务分配一个唯一元素,并让这些任务能够根据完成的工作修改集合中的单个元素。最终结果是,我想在并行修改单个不同元素后保存集合。如果这不是一种安全的方法,我该怎么做才能最好?
您的上述代码应该可以正常工作。您正在将一个项目传递给每个工作线程。我不太确定异步属性。您可能只是 return 一个任务,然后在您的方法中执行:
public Task DoWorkOnItemInCollection(CustomData data)
{
return Task.Run(() => {
DoABunchOfWorkElsewhere().Wait();
data.finished = true;
data.result = "Parallel";
});
}
您可能需要小心,对于大量项目,后台线程可能会溢出最大线程数。在这种情况下,c# 只会删除您的线程,以后很难调试。
我以前做过,如果不是将整个集合交给某个神奇的 linq,而是做一个经典的消费者问题,可能会更容易:
class ParallelWorker<T>
{
private Action<T> Action;
private Queue<T> Queue = new Queue<T>();
private object QueueLock = new object();
private void DoWork()
{
while(true)
{
T item;
lock(this.QueueLock)
{
if(this.Queue.Count == 0) return; //exit thread
item = this.Queue.DeQueue();
}
try { this.Action(item); }
catch { /*...*/ }
}
}
public void DoParallelWork(IEnumerable<T> items, int maxDegreesOfParallelism, Action<T> action)
{
this.Action = action;
this.Queue.Clear();
this.Queue.AddRange(items);
List<Thread> threads = new List<Thread>();
for(int i = 0; i < items; i++)
{
ParameterizedThreadStart threadStart = new ParameterizedThreadStart(DoWork);
Thread thread = new Thread(threadStart);
thread.Start();
threads.Add(thread);
}
foreach(Thread thread in threads)
{
thread.Join();
}
}
}
这是IDE免费完成的,所以可能会有拼写错误。
我将建议您使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main") 来完成此任务。
代码如下:
public void DoWorkInParallel(CustomCollectionClass collection)
{
var query =
from x in collection.ToObservable()
from r in Observable.FromAsync(() => DoWorkOnItemInCollection(x))
select x;
query.Subscribe(x => { }, ex => { }, async () =>
{
await SaveModifedCollection(collection);
});
}
完成。而已。仅此而已。
不过我不得不说,当我尝试将您的代码添加到 运行 时,它充满了错误和问题。我怀疑您发布的代码不是您的生产代码,而是您专门为此问题编写的示例。我建议您在发布之前尝试制作一个 运行ning 可编译示例。
尽管如此,我的建议稍作调整后应该对您有用。
它是多线程和线程安全的。完成后它确实会干净地保存修改后的集合。
您的代码是执行此操作的正确方法,假设多次启动 DoABunchOfWorkElsewhere()
本身是安全的。
您不必担心您的 LINQ 查询,因为它实际上 运行 不是并行的。它所做的只是多次调用 DoWorkOnItemInCollection()
。这些调用可能并行工作(或不并行,取决于您的同步上下文和 DoABunchOfWorkElsewhere()
的实现),但您显示的代码是安全的。
给定一些这样的代码
public class CustomCollectionClass : Collection<CustomData> {}
public class CustomData
{
string name;
bool finished;
string result;
}
public async Task DoWorkInParallel(CustomCollectionClass collection)
{
// collection can be retrieved from a DB, may not exist.
if (collection == null)
{
collection = new CustomCollectionClass();
foreach (var data in myData)
{
collection.Add(new CustomData()
{
name = data.Name;
});
}
}
// This part doesn't feel safe. Not sure what to do here.
var processTasks = myData.Select(o =>
this.DoWorkOnItemInCollection(collection.Single(d => d.name = o.Name))).ToArray();
await Task.WhenAll(processTasks);
await SaveModifedCollection(collection);
}
public async Task DoWorkOnItemInCollection(CustomData data)
{
await DoABunchOfWorkElsewhere();
// This doesn't feel safe either. Lock here?
data.finished = true;
data.result = "Parallel";
}
正如我在内联的几条评论中指出的那样,我觉得执行上述操作并不安全,但我不确定。我确实有一个元素集合,我想为每个并行任务分配一个唯一元素,并让这些任务能够根据完成的工作修改集合中的单个元素。最终结果是,我想在并行修改单个不同元素后保存集合。如果这不是一种安全的方法,我该怎么做才能最好?
您的上述代码应该可以正常工作。您正在将一个项目传递给每个工作线程。我不太确定异步属性。您可能只是 return 一个任务,然后在您的方法中执行:
public Task DoWorkOnItemInCollection(CustomData data)
{
return Task.Run(() => {
DoABunchOfWorkElsewhere().Wait();
data.finished = true;
data.result = "Parallel";
});
}
您可能需要小心,对于大量项目,后台线程可能会溢出最大线程数。在这种情况下,c# 只会删除您的线程,以后很难调试。
我以前做过,如果不是将整个集合交给某个神奇的 linq,而是做一个经典的消费者问题,可能会更容易:
class ParallelWorker<T>
{
private Action<T> Action;
private Queue<T> Queue = new Queue<T>();
private object QueueLock = new object();
private void DoWork()
{
while(true)
{
T item;
lock(this.QueueLock)
{
if(this.Queue.Count == 0) return; //exit thread
item = this.Queue.DeQueue();
}
try { this.Action(item); }
catch { /*...*/ }
}
}
public void DoParallelWork(IEnumerable<T> items, int maxDegreesOfParallelism, Action<T> action)
{
this.Action = action;
this.Queue.Clear();
this.Queue.AddRange(items);
List<Thread> threads = new List<Thread>();
for(int i = 0; i < items; i++)
{
ParameterizedThreadStart threadStart = new ParameterizedThreadStart(DoWork);
Thread thread = new Thread(threadStart);
thread.Start();
threads.Add(thread);
}
foreach(Thread thread in threads)
{
thread.Join();
}
}
}
这是IDE免费完成的,所以可能会有拼写错误。
我将建议您使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main") 来完成此任务。
代码如下:
public void DoWorkInParallel(CustomCollectionClass collection)
{
var query =
from x in collection.ToObservable()
from r in Observable.FromAsync(() => DoWorkOnItemInCollection(x))
select x;
query.Subscribe(x => { }, ex => { }, async () =>
{
await SaveModifedCollection(collection);
});
}
完成。而已。仅此而已。
不过我不得不说,当我尝试将您的代码添加到 运行 时,它充满了错误和问题。我怀疑您发布的代码不是您的生产代码,而是您专门为此问题编写的示例。我建议您在发布之前尝试制作一个 运行ning 可编译示例。
尽管如此,我的建议稍作调整后应该对您有用。
它是多线程和线程安全的。完成后它确实会干净地保存修改后的集合。
您的代码是执行此操作的正确方法,假设多次启动 DoABunchOfWorkElsewhere()
本身是安全的。
您不必担心您的 LINQ 查询,因为它实际上 运行 不是并行的。它所做的只是多次调用 DoWorkOnItemInCollection()
。这些调用可能并行工作(或不并行,取决于您的同步上下文和 DoABunchOfWorkElsewhere()
的实现),但您显示的代码是安全的。