结合任务并行库使用 Entity framework

Using Entity framework in conjunction with Task Parallel Library

我有一个正在使用 .NET 4.0 和 EF 6.0 开发的应用程序。该程序的前提非常简单。监视文件系统上的特定文件夹。当新文件放入此文件夹时,在 SQL 服务器数据库(使用 EF)中查找有关此文件的信息,然后根据找到的内容将文件移动到文件系统上的另一个文件夹。文件移动完成后,返回数据库并更新有关此文件的信息(注册文件移动)。

这些是大型媒体文件,因此每个文件都可能需要一段时间才能移动到目标位置。此外,我们可能会在启动此服务时将数百个此类媒体文件放在源文件夹中,这些媒体文件已经需要发送到目标位置。

所以为了加快速度,我开始使用任务并行库(async/await 不可用,因为这是 .NET 4.0)。对于源文件夹中的每个文件,我在数据库中查找有关它的信息,确定它需要移动到哪个目标文件夹,然后启动一个开始移动文件的新任务……

LookupFileinfoinDB(filename)
{
  // use EF DB Context to look up file in DB
}

// start a new task to begin the file move
var moveFileTask = Task<bool>.Factory.StartNew(
                () =>
                    {
                        var success = false;

                        try
                        {
                         // the code to actually moves the file goes here…
                         .......
                         }
                      }

现在,此任务完成后,我必须返回数据库并更新有关文件的信息。这就是我 运行 遇到问题的地方。 (请记住,我可能同时有几个这样的“移动文件任务”运行,它们将在不同的时间完成。目前,我正在使用任务延续来在数据库中注册文件移动:

filemoveTask.ContinueWith(
                       t =>
                       {
                           if (t.IsCompleted && t.Result)
                           {
                             RegisterFileMoveinDB();
                           }
                       }

问题是我使用相同的数据库上下文在主任务中以及稍后在嵌套任务上执行的 RegistetrFilemoveinDB() 方法中查找文件信息。将多个文件一起移动时,我收到了各种奇怪的异常(主要是关于 SQL 服务器数据 reader 等)。在线搜索答案表明,像我在这里所做的那样,在多个任务之间共享数据库上下文是一个很大的禁忌,因为 EF 不是线程安全的。

我不想为每个文件移动创建一个新的数据库上下文,因为可能有几十个甚至数百个文件同时移动。什么是好的替代方法?当嵌套任务完成并完成主任务中的文件移动注册时,有没有办法 'signal' 主任务?还是我以错误的方式一起解决了这个问题,有更好的方法来解决这个问题?

最好的办法是为每个线程确定 DbContext 的范围。 Parallel.ForEach 具有对此有用的重载(具有 Func<TLocal> initLocal:

的重载
Parallel.ForEach( 
    fileNames, // the filenames IEnumerable<string> to be processed
    () => new YourDbContext(), // Func<TLocal> localInit
    ( fileName, parallelLoopState, dbContext ) => // body
    {
        // your logic goes here
        // LookUpFileInfoInDB( dbContext, fileName )
        // MoveFile( ... )
        // RegisterFileMoveInDB( dbContext, ... )

        // pass dbContext along to the next iteration
        return dbContext;
    }
    ( dbContext ) => // Action<TLocal> localFinally
    {
        dbContext.SaveChanges(); // single SaveChanges call for each thread
        dbContext.Dispose();
    } );

如果您希望尽快更新数据库,可以在正文 expression/RegisterFileMoveInDB 中调用 SaveChanges()。我建议将文件系统操作与数据库事务捆绑在一起,这样如果数据库更新失败,文件系统操作就会回滚。

根据@Moho 的问题:

  1. 中的线程,即内置的IO async操作取自 .NET runtime CLR 的线程池,所以它是非常有效的机制。如果 你自己创建线程你用旧的方式做它是 效率低下,尤其是对于 IO 操作。

  2. 当您拨打 async 时,您不必立即等待。推迟等待,直到有必要。

此致。

你也可以通过ExclusiveScheduler of a ConcurrentExclusiveSchedulerPair instance as a parameter of ContinueWith。这样,延续将 运行 按顺序而不是同时相互关联。

TaskScheduler exclusiveScheduler
    = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;

//...

filemoveTask.ContinueWith(t => 
{
    if (t.Result)
    {
        RegisterFileMoveinDB();
    }
}, exclusiveScheduler);