TPL 中的每线程实例对象 Parallel.ForEach
Per-thread instance object in TPL Parallel.ForEach
是否有一种 TPL 语法允许您将对象从池中注入到任务中,以便一个对象一次只能由一个线程使用?甚至更好 - 仅由同一个线程使用?
用法示例
假设我想创建 10 个线程来打开 10 个文件:1.txt
、2.txt
、3.txt
... 10.txt
并随机写入 500 000 个后续数字到这些文件。
我能做到:
ConcurrentQueue<int> objs = new ConcurrentQueue<int>(); // 500000 numbers go here
Task[] tasks = Enumerable.Range(1, 10)
.Select(i =>
{
return Task.Factory.StartNew(() =>
{
using (var f = File.Open($"{i}.txt"))
{
using (var wr = StreamWriter(f))
{
while (objs.TryDequeue(out int obj))
{
wr.WriteLine(obj);
}
}
}
}
})
.ToArray();
Task.WaitAll(tasks);
但是,是否可以在不使用并发集合的情况下仅使用 TPL 来提供相同的行为?
不,没有。
最接近的解决方案是手动创建 N 个线程(使用 Task
或 Parallel.For
/ Parallel.ForEach
)并使用 ConcurrentQueue
线程安全地分发数据。
如果除了最后两次编辑之外的所有内容都被删除会更好。
如果问题是Can you pass an object per task (not thread) when using Parallel.
?答案是:可以,通过 any of the overloads that accept local state, ie have a TLocal
type like this one :
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
Parallel.For
不使用线程。它对数据进行分区并为每个分区创建一个任务。每个任务最终处理一个分区的所有数据。通常,Parallel
使用与内核一样多的任务。它还使用current线程进行处理,这就是它出现阻塞当前线程的原因。它没有,它开始用于处理其中一个分区。
处理本地数据的函数允许您生成初始本地值并将其传递给每个 body
调用。所有使用本地数据的重载都需要 body
重新调整(可能修改过的)数据,因此 Parallel
本身 不必 存储它。这是必不可少的,因为 Parallel.
可以终止和重新启动任务。如果它必须跟踪本地数据,就无法轻松或高效地完成此操作。
对于这个特定的示例,并绕过 ORM 不适合批量操作这一事实,尤其是在处理数十万个对象时,localInit
应该创建一个新会话。 body
应该使用并且 return 该会话,而最后,localFinally
应该处理它。
var mySessionFactory
var myData=....;
Parallel.ForEach(
myData,
()=>CreateSession(),
(record,state,session)=>{
//process the data etc.
return session;
},
(session)=>session.Dispose()
);
还有一些警告。 NH 将更改保存在内存中,直到它们被刷新并且缓存被清除。这会产生内存问题。一种解决方案是保持计数并定期刷新数据。状态可以是 (int counter,Session session)
元组,而不是会话:
Parallel.ForEach(
myData,
()=>(counter:0,session:CreateSession()),
(record,state,localData)=>{
var (counter,session)=localData;
//process the data etc.
if (counter % 1000 ==0)
{
session.Flush();
session.Clear();
}
return (++counter,session);
},
data=>data.session.Dispose()
);
更好 的解决方案是提前对对象进行批处理,这样循环就可以在 IEnumerable<MyRecord[]>
数组上运行,而不是 IEnumerable<MyRecord>
。结合批处理语句,这将减少 ORM 对批量操作施加的性能损失。
编写 Batch
方法并不难,但 MoreLinq 已经提供了一个方法,可作为源代码或 NuGet 包使用:
var myBatches=myData.Batch(1000);
Parallel.ForEach(
myBatches,
()=>CreateSession(),
(records,state,session)=>{
foreach(var record in records)
{
//process the data etc.
session.Save(record);
}
session.Flush();
session.Clear();
return session;
},
data=>data.session.Dispose()
);
是否有一种 TPL 语法允许您将对象从池中注入到任务中,以便一个对象一次只能由一个线程使用?甚至更好 - 仅由同一个线程使用?
用法示例
假设我想创建 10 个线程来打开 10 个文件:1.txt
、2.txt
、3.txt
... 10.txt
并随机写入 500 000 个后续数字到这些文件。
我能做到:
ConcurrentQueue<int> objs = new ConcurrentQueue<int>(); // 500000 numbers go here
Task[] tasks = Enumerable.Range(1, 10)
.Select(i =>
{
return Task.Factory.StartNew(() =>
{
using (var f = File.Open($"{i}.txt"))
{
using (var wr = StreamWriter(f))
{
while (objs.TryDequeue(out int obj))
{
wr.WriteLine(obj);
}
}
}
}
})
.ToArray();
Task.WaitAll(tasks);
但是,是否可以在不使用并发集合的情况下仅使用 TPL 来提供相同的行为?
不,没有。
最接近的解决方案是手动创建 N 个线程(使用 Task
或 Parallel.For
/ Parallel.ForEach
)并使用 ConcurrentQueue
线程安全地分发数据。
如果除了最后两次编辑之外的所有内容都被删除会更好。
如果问题是Can you pass an object per task (not thread) when using Parallel.
?答案是:可以,通过 any of the overloads that accept local state, ie have a TLocal
type like this one :
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
Parallel.For
不使用线程。它对数据进行分区并为每个分区创建一个任务。每个任务最终处理一个分区的所有数据。通常,Parallel
使用与内核一样多的任务。它还使用current线程进行处理,这就是它出现阻塞当前线程的原因。它没有,它开始用于处理其中一个分区。
处理本地数据的函数允许您生成初始本地值并将其传递给每个 body
调用。所有使用本地数据的重载都需要 body
重新调整(可能修改过的)数据,因此 Parallel
本身 不必 存储它。这是必不可少的,因为 Parallel.
可以终止和重新启动任务。如果它必须跟踪本地数据,就无法轻松或高效地完成此操作。
对于这个特定的示例,并绕过 ORM 不适合批量操作这一事实,尤其是在处理数十万个对象时,localInit
应该创建一个新会话。 body
应该使用并且 return 该会话,而最后,localFinally
应该处理它。
var mySessionFactory
var myData=....;
Parallel.ForEach(
myData,
()=>CreateSession(),
(record,state,session)=>{
//process the data etc.
return session;
},
(session)=>session.Dispose()
);
还有一些警告。 NH 将更改保存在内存中,直到它们被刷新并且缓存被清除。这会产生内存问题。一种解决方案是保持计数并定期刷新数据。状态可以是 (int counter,Session session)
元组,而不是会话:
Parallel.ForEach(
myData,
()=>(counter:0,session:CreateSession()),
(record,state,localData)=>{
var (counter,session)=localData;
//process the data etc.
if (counter % 1000 ==0)
{
session.Flush();
session.Clear();
}
return (++counter,session);
},
data=>data.session.Dispose()
);
更好 的解决方案是提前对对象进行批处理,这样循环就可以在 IEnumerable<MyRecord[]>
数组上运行,而不是 IEnumerable<MyRecord>
。结合批处理语句,这将减少 ORM 对批量操作施加的性能损失。
编写 Batch
方法并不难,但 MoreLinq 已经提供了一个方法,可作为源代码或 NuGet 包使用:
var myBatches=myData.Batch(1000);
Parallel.ForEach(
myBatches,
()=>CreateSession(),
(records,state,session)=>{
foreach(var record in records)
{
//process the data etc.
session.Save(record);
}
session.Flush();
session.Clear();
return session;
},
data=>data.session.Dispose()
);