在不同的、独立的对象上工作的 C# 任务,仍然会出现同步错误,为什么?

C# Tasks that work on different, independent objects, synchronization errors still occur, why?

我的程序是 运行 个任务,每次都是 n 个任务组。 每个任务将数据写入自己的 Queue<string> 对象,由 List<Queue<string>> 队列中的 Queue<string> 的索引提供。 任务不共享数据或队列,但我仍然遇到同步错误。 我知道数据结构不是线程安全的,我不明白为什么它们应该是线程安全的,以及为什么我会收到错误,因为每个 Task 都有自己的数据结构,什么会导致错误?

这里有一个简单的代码来演示:

class Program
{
    static int j = 0;
    List<Queue<string>> queueList = new List<Queue<string>>();

    public void StartTasts(int n)
    {
        for (int i = 0; i < n; i++)
            queueList.Add(new Queue<string>());

        List<Task> tsk = new List<Task>();
        for (int TaskGroup = 0; TaskGroup < 10; TaskGroup++)
        {   //10 groups of task
            //each group has 'n' tasks working in parallel
            for (int i = 0; i < n; i++)
            {
                //each task gets its own and independent queue from the list
                tsk.Add(Task.Factory.StartNew(() =>
                {
                    DoWork(j % n);
                }));
                j++;
            }
            //waiting for each task group to finish
            foreach (Task t in tsk)
                t.Wait();
            //after they all finished working with the queues, clear queues
            //making them ready for the nest task group
            foreach (Queue<string> q in queueList)
                q.Clear();
        }
    }

    public void DoWork(int queue)
    {
        //demonstration of generating strings 
        //and put them in the correct queue
        for (int k = 0; k < 10000; k++)
            queueList[queue].Enqueue(k + "");
    }


    static void Main(string[] args)
    {
        new Program().StartTasts(10);
    }

}

此程序会产生一些错误,例如:

System.ArgumentException: 'Destination array was not long enough. Check destIndex and length, and the array's lower bounds.'

System.IndexOutOfRangeException: 'Index was outside the bounds of the array.' (at the Queue)

System.AggregateException: One or more errors occurred. ---> System.ArgumentException: Source array was not long enough. Check srcIndex and length, and the array's lower bounds.

以及更多错误,不会出现在连载案例中。 我很想知道为什么,因为我看不到这些任务如何弄乱彼此的独立队列。

我不认为你的问题出在问题上,它似乎在列表本身中可能是一个问题。

作为使用并行或同步进程的规则,列表不是线程保存 DS。

尝试使用线程保存 DS Like ConcurrentBag Class

问题是正常的变量闭包问题。因为所有任务都共享变量 j 的同一个实例,所以它们都将共享相同的值,很可能发生的情况是您的循环非常快速地启动 10 个任务,但在它们中的任何一个都可以达到 j % n 的值之前j 已经变成 10.

制作一个在 for 循环范围内声明的 k 的本地副本,它应该可以解决您的问题。

public void StartTasts(int n)
{
    for (int i = 0; i < n; i++)
        queueList.Add(new Queue<string>());

    List<Task> tsk = new List<Task>();
    for (int TaskGroup = 0; TaskGroup < 10; TaskGroup++)
    {   //10 groups of task
        //each group has 'n' tasks working in parallel
        for (int i = 0; i < n; i++)
        {
            int k = j; // `int k = i;` would work here too and give you the same results.

            tsk.Add(Task.Factory.StartNew(() =>
            {
                DoWork(k % n);
            }));
            j++;
        }
        //waiting for each task group to finish
        foreach (Task t in tsk)
            t.Wait();
        //after they all finished working with the queues, clear queues
        //making them ready for the nest task group
        foreach (Queue<string> q in queueList)
            q.Clear();
    }
}

如果您想通过更简单的娱乐来查看实际问题,请改用这个简单的代码。

public static void Main(string[] args)
{

    for (int i = 0; i < 10; i++)
    {
        int j = i;
        Task.TaskFactory.StartNew(() =>
        {
            Thread.Sleep(10); //Give a little time for the for loop to complete.
            Console.WriteLine("i: " + i + " j: " + j);
        }
    });
    Console.ReadLine();
}

您已在任务内计算出taskId,并在任务外更改了计算基数。 我只是稍微改变了逻辑。我没有任何错误。

namespace Project1
{
    using System.Collections.Generic;
    using System.Threading.Tasks;

    internal class Program
    {
        private static int j = 0;
        private readonly List<Queue<string>> queueList = new List<Queue<string>>();

        public void StartTasts(int n)
        {
            for (var i = 0; i < n; i++)
            {
                this.queueList.Add(new Queue<string>());
            }

            var taskList = new List<Task>();
            for (var taskGroup = 0; taskGroup < 10; taskGroup++)
            {
                // 10 groups of task
                // each group has 'n' tasks working in parallel
                for (var i = 0; i < n; i++)
                {
                    // each task gets its own and independent queue from the list
                    var taskId = j % n;
                    taskList.Add(
                        Task.Factory.StartNew(
                            () =>
                            {
                                this.DoWork(taskId);
                            }));
                    j++;
                }

                // waiting for each task group to finish
                foreach (var t in taskList)
                {
                    t.Wait();
                }

                // after they all finished working with the queues, clear queues
                // making them ready for the nest task group
                foreach (var q in this.queueList)
                {
                    q.Clear();
                }
            }
        }

        public void DoWork(int queue)
        {
            // demonstration of generating strings 
            // and put them in the correct queue
            for (var k = 0; k < 10000; k++)
            {
                this.queueList[queue].Enqueue(k + string.Empty);
            }
        }

        private static void Main(string[] args)
        {
            new Program().StartTasts(10);
        }      
    }
}