使用比预期更早返回的 EventWaitHandles 的方法
Method using EventWaitHandles returning earlier than expected
我创建了一个简单的方法,它应该获取一个列表,比如 100 个项目,并异步处理它们(一次最多 MAX_CONCURRENT
个元素),并且 return仅在处理完所有元素后:
/// <summary>Generic method to perform an action or set of actions
/// in parallel on each item in a collection of items, returning
/// only when all actions have been completed.</summary>
/// <typeparam name="T">The element type</typeparam>
/// <param name="elements">A collection of elements, each of which to
/// perform the action on.</param>
/// <param name="action">The action to perform on each element. The
/// action should of course be thread safe.</param>
/// <param name="MAX_CONCURRENT">The maximum number of concurrent actions.</param>
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
int remaining = 0;
// Signal to notify the main thread when a worker is done
AutoResetEvent onComplete = new AutoResetEvent(false);
foreach (T element in elements)
{
Interlocked.Increment(ref remaining);
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
Interlocked.Decrement(ref remaining);
limit.Release();
onComplete.Set();
}
}).Start();
}
// Wait for all requests to complete
while (remaining > 0)
onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10)
}
/* We include a timeout on the `WaitOne()` before checking `remaining` again
* to protect against the rare case where the last outstanding thread
* decrements 'remaining' and then signals completion *between* the main thread
* checking 'remaining' and waiting for the next completion signal, which would
* otherwise result in the main thread missing the last signal and locking forever. */
大多数时候,这段代码的行为完全符合预期,但在极少数情况下,我发现列表中每个元素之前的方法 returns(即跳出最后的 while 循环)是处理完毕。当只剩下几个元素时,它似乎总是会发生 - 例如我将处理 97 个元素,然后是方法 returns,然后是完成的元素 98-100。
我是否做错了什么可能导致 remaining
计数在所有元素实际处理之前达到 0?
这是一个修改后的解决方案,它使用 CountdownEvent
信号来避免使用 remaining
整数,并避免在使用不可靠的 AutoResetEvent onComplete
轮询它时所涉及的繁忙等待:
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
// Ensure elements is only enumerated once.
elements = elements as T[] ?? elements.ToArray();
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
CountdownEvent remaining = new CountdownEvent(elements.Count());
foreach (T element in elements)
{
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
remaining.Signal();
limit.Release();
}
}).Start();
}
// Wait for all requests to complete
remaining.Wait();
}
正在进行测试,看看是否能解决问题。
我创建了一个简单的方法,它应该获取一个列表,比如 100 个项目,并异步处理它们(一次最多 MAX_CONCURRENT
个元素),并且 return仅在处理完所有元素后:
/// <summary>Generic method to perform an action or set of actions
/// in parallel on each item in a collection of items, returning
/// only when all actions have been completed.</summary>
/// <typeparam name="T">The element type</typeparam>
/// <param name="elements">A collection of elements, each of which to
/// perform the action on.</param>
/// <param name="action">The action to perform on each element. The
/// action should of course be thread safe.</param>
/// <param name="MAX_CONCURRENT">The maximum number of concurrent actions.</param>
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
int remaining = 0;
// Signal to notify the main thread when a worker is done
AutoResetEvent onComplete = new AutoResetEvent(false);
foreach (T element in elements)
{
Interlocked.Increment(ref remaining);
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
Interlocked.Decrement(ref remaining);
limit.Release();
onComplete.Set();
}
}).Start();
}
// Wait for all requests to complete
while (remaining > 0)
onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10)
}
/* We include a timeout on the `WaitOne()` before checking `remaining` again
* to protect against the rare case where the last outstanding thread
* decrements 'remaining' and then signals completion *between* the main thread
* checking 'remaining' and waiting for the next completion signal, which would
* otherwise result in the main thread missing the last signal and locking forever. */
大多数时候,这段代码的行为完全符合预期,但在极少数情况下,我发现列表中每个元素之前的方法 returns(即跳出最后的 while 循环)是处理完毕。当只剩下几个元素时,它似乎总是会发生 - 例如我将处理 97 个元素,然后是方法 returns,然后是完成的元素 98-100。
我是否做错了什么可能导致 remaining
计数在所有元素实际处理之前达到 0?
这是一个修改后的解决方案,它使用 CountdownEvent
信号来避免使用 remaining
整数,并避免在使用不可靠的 AutoResetEvent onComplete
轮询它时所涉及的繁忙等待:
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
// Ensure elements is only enumerated once.
elements = elements as T[] ?? elements.ToArray();
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
CountdownEvent remaining = new CountdownEvent(elements.Count());
foreach (T element in elements)
{
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
remaining.Signal();
limit.Release();
}
}).Start();
}
// Wait for all requests to complete
remaining.Wait();
}
正在进行测试,看看是否能解决问题。