通过使用任务加快处理改进
Speed processing improvement by using tasks
我有以下代码:
class Program
{
class ProcessedEven
{
public int ProcessedInt { get; set; }
public DateTime ProcessedValue { get; set; }
}
class ProcessedOdd
{
public int ProcessedInt { get; set; }
public string ProcessedValue { get; set; }
}
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
Dictionary<int, ProcessedOdd> processedOddValuesDictionary = new Dictionary<int, ProcessedOdd>();
Dictionary<int, ProcessedEven> processedEvenValuesDictionary = new Dictionary<int, ProcessedEven>();
stopwatch.Start();
while (enumerator.MoveNext())
{
int currentNumber = enumerator.Current;
if (currentNumber % 2 == 0)
{
Task.Run(() =>
{
ProcessedEven processedEven =
new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
await Task.Delay(100);
processedEvenValuesDictionary.Add(currentNumber, processedEven);
});
}
else
{
Task.Run(() =>
{
ProcessedOdd processedOdd =
new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
await Task.Delay(100);
processedOddValuesDictionary.Add(currentNumber, processedOdd);
});
}
}
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
所以基本上我必须迭代一个始终同步的枚举器。
一旦迭代器的当前值被获取,它就会被处理,不知何故需要很长时间。之后根据其值被添加到字典中进行处理。所以最后必须用正确的值填充字典。
为了提高速度,我认为引入一些并行性可能会有所帮助,但在添加 "Task.Run" 调用之后,一些
"System.NullReferenceException: 'Object reference not set to an instance of an object"
出现异常。与此代码的 "synchronous" 版本(没有 "Task.Run" 调用的版本)相比,执行时间也增加了。
我不明白为什么会出现这些异常,因为一切似乎都不为空。
有没有办法通过使用多线程来提高这种情况下的速度(原始代码没有 "Task.Run" 调用)?
是否应该在 lock 语句中将已处理的元素添加到字典中,因为字典似乎在任务之间共享?
您应该使用 ConcurrentDictionary
,它是 key/value 对的线程安全集合,可以被多个线程同时访问。
ConcurrentDictionary
是为多线程场景设计的。您不必在代码中使用锁来添加或删除集合中的项目。但是,总是有可能一个线程检索一个值,而另一个线程通过为同一个键赋予一个新值来立即更新集合。
当我在将 Dictionary
更改为 ConcurrentDictionary
后 运行 您的代码时,代码在没有 NullReferenceException
的情况下运行并在 ~1.37 秒内完成。
完整代码:
class Program
{
class ProcessedEven
{
public int ProcessedInt { get; set; }
public DateTime ProcessedValue { get; set; }
}
class ProcessedOdd
{
public int ProcessedInt { get; set; }
public string ProcessedValue { get; set; }
}
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();
stopwatch.Start();
while (enumerator.MoveNext())
{
int currentNumber = enumerator.Current;
if (currentNumber % 2 == 0)
{
Task.Run(() =>
{
ProcessedEven processedEven =
new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
Task.Delay(100);
processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
});
}
else
{
Task.Run(() =>
{
ProcessedOdd processedOdd =
new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
Task.Delay(100);
processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
});
}
}
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
}
您正在创建许多小任务并通过调用 Task.Run 耗尽您的线程池。您最好使用 Parallel.ForEach
以获得更好的性能。正如@user1672994 所说,您应该使用 Dictionary
- ConcurrentDictionary
的线程安全版本
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
IEnumerable<int> enumerable = Enumerable.Range(0, 100000);
ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();
stopwatch.Start();
Parallel.ForEach(enumerable,
currentNumber =>
{
if (currentNumber % 2 == 0)
{
ProcessedEven processedEven =
new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
// Task.Delay(100);
processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
}
else
{
ProcessedOdd processedOdd =
new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
// Task.Delay(100);
processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
}
});
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
我也不明白为什么您的代码中需要 Task.Delay(100)
。无论如何,如果没有 await
运算符,异步操作会做一些您可能不会想到的事情。 Ether 使用 await 或使用同步版本 Thread.Sleep(100)
您获得 NullReferenceException
的具体原因是 Dictionary
容器的内部状态已损坏。可能有两个线程试图并行调整 Dictionary
的两个内部数组的大小,或者其他同样令人讨厌的事情。实际上,您很幸运遇到这些异常,因为更糟糕的结果是拥有一个产生错误结果的工作程序。
这个问题更普遍的原因是您允许并行异步访问线程不安全的对象。 Dictionary
class 与大多数内置 .NET classes 一样,不是线程安全的。它是在假设将由单个线程访问(或至少一次由一个线程访问)的情况下实现的。它不包含内部同步。原因是在 class 中添加同步会导致 API 复杂性和性能开销,并且没有理由在每次使用此 class 时都支付此开销,因为它只会在一些特殊情况下需要。
您的问题有多种解决方案。一种是继续使用线程不安全的 Dictionary
,但确保使用锁以独占方式访问它。这是最灵活的解决方案,但您需要非常小心,不允许任何未受保护的代码路径指向该对象。访问 every 属性 和 every 方法,读取 或 写入它,必须是在 lock
里面。所以灵活但脆弱,在竞争激烈的情况下可能成为性能瓶颈(即并发请求排他锁的线程过多,被迫排队等候)。
另一种解决方案是使用像ConcurrentDictionary
这样的线程安全容器。 class 确保其内部状态在被多个线程并行访问时永远不会损坏。不幸的是,它无法确保程序的其余状态。所以它适用于一些简单的情况,除了字典本身,你没有其他共享状态。在这些情况下,它提供了性能改进,因为它是通过细粒度内部锁定实现的(有多个锁,一个用于每个数据段)。
最好的解决方案是通过消除共享状态来完全消除对线程同步的需要。只需让每个线程处理其内部隔离的子集或数据,并且仅在所有线程完成后才合并这些子集。这通常会提供最佳性能,但代价是必须对初始工作负载进行分区,然后编写最终的合并代码。有些库遵循这种策略,但正在处理所有这些样板文件,让您编写尽可能少的代码。最好的之一是 TPL Dataflow library,它实际上嵌入在 .NET Core 平台中。对于 .NET Framework,您需要安装一个包才能使用它。
我有以下代码:
class Program
{
class ProcessedEven
{
public int ProcessedInt { get; set; }
public DateTime ProcessedValue { get; set; }
}
class ProcessedOdd
{
public int ProcessedInt { get; set; }
public string ProcessedValue { get; set; }
}
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
Dictionary<int, ProcessedOdd> processedOddValuesDictionary = new Dictionary<int, ProcessedOdd>();
Dictionary<int, ProcessedEven> processedEvenValuesDictionary = new Dictionary<int, ProcessedEven>();
stopwatch.Start();
while (enumerator.MoveNext())
{
int currentNumber = enumerator.Current;
if (currentNumber % 2 == 0)
{
Task.Run(() =>
{
ProcessedEven processedEven =
new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
await Task.Delay(100);
processedEvenValuesDictionary.Add(currentNumber, processedEven);
});
}
else
{
Task.Run(() =>
{
ProcessedOdd processedOdd =
new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
await Task.Delay(100);
processedOddValuesDictionary.Add(currentNumber, processedOdd);
});
}
}
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
所以基本上我必须迭代一个始终同步的枚举器。
一旦迭代器的当前值被获取,它就会被处理,不知何故需要很长时间。之后根据其值被添加到字典中进行处理。所以最后必须用正确的值填充字典。
为了提高速度,我认为引入一些并行性可能会有所帮助,但在添加 "Task.Run" 调用之后,一些
"System.NullReferenceException: 'Object reference not set to an instance of an object"
出现异常。与此代码的 "synchronous" 版本(没有 "Task.Run" 调用的版本)相比,执行时间也增加了。
我不明白为什么会出现这些异常,因为一切似乎都不为空。
有没有办法通过使用多线程来提高这种情况下的速度(原始代码没有 "Task.Run" 调用)?
是否应该在 lock 语句中将已处理的元素添加到字典中,因为字典似乎在任务之间共享?
您应该使用 ConcurrentDictionary
,它是 key/value 对的线程安全集合,可以被多个线程同时访问。
ConcurrentDictionary
是为多线程场景设计的。您不必在代码中使用锁来添加或删除集合中的项目。但是,总是有可能一个线程检索一个值,而另一个线程通过为同一个键赋予一个新值来立即更新集合。
当我在将 Dictionary
更改为 ConcurrentDictionary
后 运行 您的代码时,代码在没有 NullReferenceException
的情况下运行并在 ~1.37 秒内完成。
完整代码:
class Program
{
class ProcessedEven
{
public int ProcessedInt { get; set; }
public DateTime ProcessedValue { get; set; }
}
class ProcessedOdd
{
public int ProcessedInt { get; set; }
public string ProcessedValue { get; set; }
}
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();
stopwatch.Start();
while (enumerator.MoveNext())
{
int currentNumber = enumerator.Current;
if (currentNumber % 2 == 0)
{
Task.Run(() =>
{
ProcessedEven processedEven =
new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
Task.Delay(100);
processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
});
}
else
{
Task.Run(() =>
{
ProcessedOdd processedOdd =
new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
Task.Delay(100);
processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
});
}
}
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
}
您正在创建许多小任务并通过调用 Task.Run 耗尽您的线程池。您最好使用 Parallel.ForEach
以获得更好的性能。正如@user1672994 所说,您应该使用 Dictionary
- ConcurrentDictionary
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
IEnumerable<int> enumerable = Enumerable.Range(0, 100000);
ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();
stopwatch.Start();
Parallel.ForEach(enumerable,
currentNumber =>
{
if (currentNumber % 2 == 0)
{
ProcessedEven processedEven =
new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
// Task.Delay(100);
processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
}
else
{
ProcessedOdd processedOdd =
new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
// Task.Delay(100);
processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
}
});
stopwatch.Stop();
Console.WriteLine(stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
我也不明白为什么您的代码中需要 Task.Delay(100)
。无论如何,如果没有 await
运算符,异步操作会做一些您可能不会想到的事情。 Ether 使用 await 或使用同步版本 Thread.Sleep(100)
您获得 NullReferenceException
的具体原因是 Dictionary
容器的内部状态已损坏。可能有两个线程试图并行调整 Dictionary
的两个内部数组的大小,或者其他同样令人讨厌的事情。实际上,您很幸运遇到这些异常,因为更糟糕的结果是拥有一个产生错误结果的工作程序。
这个问题更普遍的原因是您允许并行异步访问线程不安全的对象。 Dictionary
class 与大多数内置 .NET classes 一样,不是线程安全的。它是在假设将由单个线程访问(或至少一次由一个线程访问)的情况下实现的。它不包含内部同步。原因是在 class 中添加同步会导致 API 复杂性和性能开销,并且没有理由在每次使用此 class 时都支付此开销,因为它只会在一些特殊情况下需要。
您的问题有多种解决方案。一种是继续使用线程不安全的 Dictionary
,但确保使用锁以独占方式访问它。这是最灵活的解决方案,但您需要非常小心,不允许任何未受保护的代码路径指向该对象。访问 every 属性 和 every 方法,读取 或 写入它,必须是在 lock
里面。所以灵活但脆弱,在竞争激烈的情况下可能成为性能瓶颈(即并发请求排他锁的线程过多,被迫排队等候)。
另一种解决方案是使用像ConcurrentDictionary
这样的线程安全容器。 class 确保其内部状态在被多个线程并行访问时永远不会损坏。不幸的是,它无法确保程序的其余状态。所以它适用于一些简单的情况,除了字典本身,你没有其他共享状态。在这些情况下,它提供了性能改进,因为它是通过细粒度内部锁定实现的(有多个锁,一个用于每个数据段)。
最好的解决方案是通过消除共享状态来完全消除对线程同步的需要。只需让每个线程处理其内部隔离的子集或数据,并且仅在所有线程完成后才合并这些子集。这通常会提供最佳性能,但代价是必须对初始工作负载进行分区,然后编写最终的合并代码。有些库遵循这种策略,但正在处理所有这些样板文件,让您编写尽可能少的代码。最好的之一是 TPL Dataflow library,它实际上嵌入在 .NET Core 平台中。对于 .NET Framework,您需要安装一个包才能使用它。