Parallel.Foreach 在遍历方法调用的结果时将如何表现?
How will Parallel.Foreach behave when Iterating over the results of a method call?
范围:
我目前正在实施一个应用程序,该应用程序使用 Amazon SQS Service 作为该程序处理的数据提供者。
因为我需要对从这个 queue 中提取的消息进行并行处理queue,这就是我所做的。
Parallel.ForEach (GetMessages (msgAttributes), new ParallelOptions { MaxDegreeOfParallelism = threadCount }, message =>
{
// Processing Logic
});
这里是"GetMessages"方法的header:
private static IEnumerable<Message> GetMessages (List<String> messageAttributes = null)
{
// Dequeueing Logic... 10 At a Time
// Yielding the messages to the Parallel Loop
foreach (Message awsMessage in messages)
{
yield return awsMessage;
}
}
这将如何工作?:
我最初想到这将如何工作是 GetMessages
方法将在线程没有工作时执行(或者大量线程没有工作,类似于内部启发式来衡量这个).也就是说,对我来说,GetMessages
方法会将消息分发给 Parallel.For
工作线程,后者将处理消息并等待 Parallel.For handler
向它们提供更多消息工作。
有问题吗?我错了...
事实是,我错了。尽管如此,我仍然不知道在这种情况下发生了什么。
被 dequeued 的消息数量太多了,每次被 dequeued 时,它都会以 2 的幂增长。 dequeueing 计数(消息)如下:
- Dequeue 被调用:Returns 80 条消息
- Dequeue 被调用:Returns 160 条消息
- Dequeue 被调用:Returns 320 条消息(等等)
在某个时间点之后,被删除的消息数量queued,或者在这种情况下,等待我的应用程序处理的消息数量太多,我最终 运行内存。
更多信息:
我正在使用 thread-safe InterLocked
操作来增加上面提到的计数器。
正在使用的线程数是 25(对于 Parallel.Foreach
)
每个 "GetMessages" 将 return 最多 10 条消息(作为 IEnumerable,已生成)。
问题:这个场景到底发生了什么?
我正在 hard-time 试图弄清楚到底发生了什么。我的 GetMessages
方法是否在每个线程完成 "Processing Loop" 后被调用,因此,导致越来越多的消息被 dequeued?
对 "GetMessages" 的调用是由单个线程调用的,还是由多个线程调用的?
我认为 Parallel.ForEach
分区存在问题...您的问题是典型的生产者/消费者场景。对于这种情况,您应该有独立的逻辑,一方面用于出队,另一方面进行处理。它将尊重关注点分离,并将简化调试。
BlockingCollection<T>
将让您将两者分开:一方面,您添加要处理的项目,另一方面,您消耗它们。下面是如何实现它的示例:
您将需要 ParallelExtensionsExtras nuget 包用于 BlockingCollection<T>
工作负载分区(.GetConsumingEnumerable()
在处理方法中)。
public static class ProducerConsumer
{
public static ConcurrentQueue<String> SqsQueue = new ConcurrentQueue<String>();
public static BlockingCollection<String> Collection = new BlockingCollection<String>();
public static ConcurrentBag<String> Result = new ConcurrentBag<String>();
public static async Task TestMethod()
{
// Here we separate all the Tasks in distinct threads
Task sqs = Task.Run(async () =>
{
Console.WriteLine("Amazon on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
while (true)
{
ProducerConsumer.BackgroundFakedAmazon(); // We produce 50 Strings each second
await Task.Delay(1000);
}
});
Task deq = Task.Run(async () =>
{
Console.WriteLine("Dequeue on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
while (true)
{
ProducerConsumer.DequeueData(); // Dequeue 20 Strings each 100ms
await Task.Delay(100);
}
});
Task process = Task.Run(() =>
{
Console.WriteLine("Process on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
ProducerConsumer.BackgroundParallelConsumer(); // Process all the Strings in the BlockingCollection
});
await Task.WhenAll(c, sqs, deq, process);
}
public static void DequeueData()
{
foreach (var i in Enumerable.Range(0, 20))
{
String dequeued = null;
if (SqsQueue.TryDequeue(out dequeued))
{
Collection.Add(dequeued);
Console.WriteLine("Dequeued : " + dequeued);
}
}
}
public static void BackgroundFakedAmazon()
{
Console.WriteLine(" ---------- Generate 50 items on amazon side ---------- ");
foreach (var data in Enumerable.Range(0, 50).Select(i => Path.GetRandomFileName().Split('.').FirstOrDefault()))
SqsQueue.Enqueue(data + " / ASQS");
}
public static void BackgroundParallelConsumer()
{
// Here we stay in Parallel.ForEach, waiting for data. Once processed, we are still waiting the next chunks
Parallel.ForEach(Collection.GetConsumingEnumerable(), (i) =>
{
// Processing Logic
String processedData = "Processed : " + i;
Result.Add(processedData);
Console.WriteLine(processedData);
});
}
}
您可以像这样从控制台应用程序中尝试:
static void Main(string[] args)
{
ProducerConsumer.TestMethod().Wait();
}
范围:
我目前正在实施一个应用程序,该应用程序使用 Amazon SQS Service 作为该程序处理的数据提供者。
因为我需要对从这个 queue 中提取的消息进行并行处理queue,这就是我所做的。
Parallel.ForEach (GetMessages (msgAttributes), new ParallelOptions { MaxDegreeOfParallelism = threadCount }, message =>
{
// Processing Logic
});
这里是"GetMessages"方法的header:
private static IEnumerable<Message> GetMessages (List<String> messageAttributes = null)
{
// Dequeueing Logic... 10 At a Time
// Yielding the messages to the Parallel Loop
foreach (Message awsMessage in messages)
{
yield return awsMessage;
}
}
这将如何工作?:
我最初想到这将如何工作是 GetMessages
方法将在线程没有工作时执行(或者大量线程没有工作,类似于内部启发式来衡量这个).也就是说,对我来说,GetMessages
方法会将消息分发给 Parallel.For
工作线程,后者将处理消息并等待 Parallel.For handler
向它们提供更多消息工作。
有问题吗?我错了...
事实是,我错了。尽管如此,我仍然不知道在这种情况下发生了什么。
被 dequeued 的消息数量太多了,每次被 dequeued 时,它都会以 2 的幂增长。 dequeueing 计数(消息)如下:
- Dequeue 被调用:Returns 80 条消息
- Dequeue 被调用:Returns 160 条消息
- Dequeue 被调用:Returns 320 条消息(等等)
在某个时间点之后,被删除的消息数量queued,或者在这种情况下,等待我的应用程序处理的消息数量太多,我最终 运行内存。
更多信息:
我正在使用 thread-safe InterLocked
操作来增加上面提到的计数器。
正在使用的线程数是 25(对于 Parallel.Foreach
)
每个 "GetMessages" 将 return 最多 10 条消息(作为 IEnumerable,已生成)。
问题:这个场景到底发生了什么?
我正在 hard-time 试图弄清楚到底发生了什么。我的 GetMessages
方法是否在每个线程完成 "Processing Loop" 后被调用,因此,导致越来越多的消息被 dequeued?
对 "GetMessages" 的调用是由单个线程调用的,还是由多个线程调用的?
我认为 Parallel.ForEach
分区存在问题...您的问题是典型的生产者/消费者场景。对于这种情况,您应该有独立的逻辑,一方面用于出队,另一方面进行处理。它将尊重关注点分离,并将简化调试。
BlockingCollection<T>
将让您将两者分开:一方面,您添加要处理的项目,另一方面,您消耗它们。下面是如何实现它的示例:
您将需要 ParallelExtensionsExtras nuget 包用于 BlockingCollection<T>
工作负载分区(.GetConsumingEnumerable()
在处理方法中)。
public static class ProducerConsumer
{
public static ConcurrentQueue<String> SqsQueue = new ConcurrentQueue<String>();
public static BlockingCollection<String> Collection = new BlockingCollection<String>();
public static ConcurrentBag<String> Result = new ConcurrentBag<String>();
public static async Task TestMethod()
{
// Here we separate all the Tasks in distinct threads
Task sqs = Task.Run(async () =>
{
Console.WriteLine("Amazon on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
while (true)
{
ProducerConsumer.BackgroundFakedAmazon(); // We produce 50 Strings each second
await Task.Delay(1000);
}
});
Task deq = Task.Run(async () =>
{
Console.WriteLine("Dequeue on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
while (true)
{
ProducerConsumer.DequeueData(); // Dequeue 20 Strings each 100ms
await Task.Delay(100);
}
});
Task process = Task.Run(() =>
{
Console.WriteLine("Process on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
ProducerConsumer.BackgroundParallelConsumer(); // Process all the Strings in the BlockingCollection
});
await Task.WhenAll(c, sqs, deq, process);
}
public static void DequeueData()
{
foreach (var i in Enumerable.Range(0, 20))
{
String dequeued = null;
if (SqsQueue.TryDequeue(out dequeued))
{
Collection.Add(dequeued);
Console.WriteLine("Dequeued : " + dequeued);
}
}
}
public static void BackgroundFakedAmazon()
{
Console.WriteLine(" ---------- Generate 50 items on amazon side ---------- ");
foreach (var data in Enumerable.Range(0, 50).Select(i => Path.GetRandomFileName().Split('.').FirstOrDefault()))
SqsQueue.Enqueue(data + " / ASQS");
}
public static void BackgroundParallelConsumer()
{
// Here we stay in Parallel.ForEach, waiting for data. Once processed, we are still waiting the next chunks
Parallel.ForEach(Collection.GetConsumingEnumerable(), (i) =>
{
// Processing Logic
String processedData = "Processed : " + i;
Result.Add(processedData);
Console.WriteLine(processedData);
});
}
}
您可以像这样从控制台应用程序中尝试:
static void Main(string[] args)
{
ProducerConsumer.TestMethod().Wait();
}