通过使用任务加快处理改进

Speed processing improvement by using tasks

我有以下代码:

class Program
{
    class ProcessedEven
    {
        public int ProcessedInt { get; set; }

        public DateTime ProcessedValue { get; set; }
    }

    class ProcessedOdd
    {
        public int ProcessedInt { get; set; }

        public string ProcessedValue { get; set; }
    }

    static void Main(string[] args)
    {
        Stopwatch stopwatch = new Stopwatch();

        IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
        Dictionary<int, ProcessedOdd> processedOddValuesDictionary = new Dictionary<int, ProcessedOdd>();
        Dictionary<int, ProcessedEven> processedEvenValuesDictionary = new Dictionary<int, ProcessedEven>();

        stopwatch.Start();

        while (enumerator.MoveNext())
        {
            int currentNumber = enumerator.Current;

            if (currentNumber % 2 == 0)
            {
                Task.Run(() =>
                {
                    ProcessedEven processedEven =
                        new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                    await Task.Delay(100);

                    processedEvenValuesDictionary.Add(currentNumber, processedEven);
                });
            }
            else
            {
                Task.Run(() =>
                {
                    ProcessedOdd processedOdd =
                        new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                    await Task.Delay(100);

                    processedOddValuesDictionary.Add(currentNumber, processedOdd);
                });
            }
        }

        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

所以基本上我必须迭代一个始终同步的枚举器。

一旦迭代器的当前值被获取,它就会被处理,不知何故需要很长时间。之后根据其值被添加到字典中进行处理。所以最后必须用正确的值填充字典。

为了提高速度,我认为引入一些并行性可能会有所帮助,但在添加 "Task.Run" 调用之后,一些

"System.NullReferenceException: 'Object reference not set to an instance of an object"

出现异常。与此代码的 "synchronous" 版本(没有 "Task.Run" 调用的版本)相比,执行时间也增加了。

我不明白为什么会出现这些异常,因为一切似乎都不为空。

有没有办法通过使用多线程来提高这种情况下的速度(原始代码没有 "Task.Run" 调用)?

是否应该在 lock 语句中将已处理的元素添加到字典中,因为字典似乎在任务之间共享?

您应该使用 ConcurrentDictionary,它是 key/value 对的线程安全集合,可以被多个线程同时访问。

ConcurrentDictionary是为多线程场景设计的。您不必在代码中使用锁来添加或删除集合中的项目。但是,总是有可能一个线程检索一个值,而另一个线程通过为同一个键赋予一个新值来立即更新集合。

当我在将 Dictionary 更改为 ConcurrentDictionary 后 运行 您的代码时,代码在没有 NullReferenceException 的情况下运行并在 ~1.37 秒内完成。

完整代码:

    class Program
    {
        class ProcessedEven
        {
            public int ProcessedInt { get; set; }

            public DateTime ProcessedValue { get; set; }
        }

        class ProcessedOdd
        {
            public int ProcessedInt { get; set; }

            public string ProcessedValue { get; set; }
        }

        static void Main(string[] args)
        {
            Stopwatch stopwatch = new Stopwatch();

            IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
            ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
            ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();

            stopwatch.Start();

            while (enumerator.MoveNext())
            {
                int currentNumber = enumerator.Current;

                if (currentNumber % 2 == 0)
                {
                    Task.Run(() =>
                    {
                        ProcessedEven processedEven =
                            new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                        Task.Delay(100);

                        processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
                    });
                }
                else
                {
                    Task.Run(() =>
                    {
                        ProcessedOdd processedOdd =
                            new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                        Task.Delay(100);

                        processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
                    });
                }
            }

            stopwatch.Stop();

            Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

            Console.ReadKey();
        }
    }

您正在创建许多小任务并通过调用 Task.Run 耗尽您的线程池。您最好使用 Parallel.ForEach 以获得更好的性能。正如@user1672994 所说,您应该使用 Dictionary - ConcurrentDictionary

的线程安全版本
static void Main(string[] args)
{
    Stopwatch stopwatch = new Stopwatch();

    IEnumerable<int> enumerable = Enumerable.Range(0, 100000);
    ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
    ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();

    stopwatch.Start();

    Parallel.ForEach(enumerable,
        currentNumber =>
            {
                if (currentNumber % 2 == 0)
                {
                    ProcessedEven processedEven =
                        new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                    // Task.Delay(100);

                    processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
                }
                else
                {
                    ProcessedOdd processedOdd =
                        new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                    // Task.Delay(100);

                    processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
                }
            });

    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

    Console.ReadKey();
}

我也不明白为什么您的代码中需要 Task.Delay(100)。无论如何,如果没有 await 运算符,异步操作会做一些您可能不会想到的事情。 Ether 使用 await 或使用同步版本 Thread.Sleep(100)

您获得 NullReferenceException 的具体原因是 Dictionary 容器的内部状态已损坏。可能有两个线程试图并行调整 Dictionary 的两个内部数组的大小,或者其他同样令人讨厌的事情。实际上,您很幸运遇到这些异常,因为更糟糕的结果是拥有一个产生错误结果的工作程序。

这个问题更普遍的原因是您允许并行异步访问线程不安全的对象。 Dictionary class 与大多数内置 .NET classes 一样,不是线程安全的。它是在假设将由单个线程访问(或至少一次由一个线程访问)的情况下实现的。它不包含内部同步。原因是在 class 中添加同步会导致 API 复杂性和性能开销,并且没有理由在每次使用此 class 时都支付此开销,因为它只会在一些特殊情况下需要。

您的问题有多种解决方案。一种是继续使用线程不安全的 Dictionary,但确保使用锁以独占方式访问它。这是最灵活的解决方案,但您需要非常小心,不允许任何未受保护的代码路径指向该对象。访问 every 属性 和 every 方法,读取 写入它,必须是在 lock 里面。所以灵活但脆弱,在竞争激烈的情况下可能成为性能瓶颈(即并发请求排他锁的线程过多,被迫排队等候)。

另一种解决方案是使用像ConcurrentDictionary 这样的线程安全容器。 class 确保其内部状态在被多个线程并行访问时永远不会损坏。不幸的是,它无法确保程序的其余状态。所以它适用于一些简单的情况,除了字典本身,你没有其他共享状态。在这些情况下,它提供了性能改进,因为它是通过细粒度内部锁定实现的(有多个锁,一个用于每个数据段)。

最好的解决方案是通过消除共享状态来完全消除对线程同步的需要。只需让每个线程处理其内部隔离的子集或数据,并且仅在所有线程完成后才合并这些子集。这通常会提供最佳性能,但代价是必须对初始工作负载进行分区,然后编写最终的合并代码。有些库遵循这种策略,但正在处理所有这些样板文件,让您编写尽可能少的代码。最好的之一是 TPL Dataflow library,它实际上嵌入在 .NET Core 平台中。对于 .NET Framework,您需要安装一个包才能使用它。