Parallel.Foreach 在遍历方法调用的结果时将如何表现?

How will Parallel.Foreach behave when Iterating over the results of a method call?

范围:

我目前正在实施一个应用程序,该应用程序使用 Amazon SQS Service 作为该程序处理的数据提供者。

因为我需要对从这个 queue 中提取的消息进行并行处理queue,这就是我所做的。

Parallel.ForEach (GetMessages (msgAttributes), new ParallelOptions { MaxDegreeOfParallelism = threadCount }, message =>
        {
             // Processing Logic
        });

这里是"GetMessages"方法的header:

    private static IEnumerable<Message> GetMessages (List<String> messageAttributes = null)
    {
        // Dequeueing Logic... 10 At a Time

        // Yielding the messages to the Parallel Loop
        foreach (Message awsMessage in messages)
        {
           yield return awsMessage;
        }
    }

这将如何工作?:

我最初想到这将如何工作是 GetMessages 方法将在线程没有工作时执行(或者大量线程没有工作,类似于内部启发式来衡量这个).也就是说,对我来说,GetMessages 方法会将消息分发给 Parallel.For 工作线程,后者将处理消息并等待 Parallel.For handler 向它们提供更多消息工作。

有问题吗?我错了...

事实是,我错了。尽管如此,我仍然不知道在这种情况下发生了什么。

被 dequeued 的消息数量太多了,每次被 dequeued 时,它都会以 2 的幂增长。 dequeueing 计数(消息)如下:

  1. Dequeue 被调用:Returns 80 条消息
  2. Dequeue 被调用:Returns 160 条消息
  3. Dequeue 被调用:Returns 320 条消息(等等)

在某个时间点之后,被删除的消息数量queued,或者在这种情况下,等待我的应用程序处理的消息数量太多,我最终 运行内存。

更多信息:

我正在使用 thread-safe InterLocked 操作来增加上面提到的计数器。

正在使用的线程数是 25(对于 Parallel.Foreach

每个 "GetMessages" 将 return 最多 10 条消息(作为 IEnumerable,已生成)。

问题:这个场景到底发生了什么?

我正在 hard-time 试图弄清楚到底发生了什么。我的 GetMessages 方法是否在每个线程完成 "Processing Loop" 后被调用,因此,导致越来越多的消息被 dequeued?

对 "GetMessages" 的调用是由单个线程调用的,还是由多个线程调用的?

我认为 Parallel.ForEach 分区存在问题...您的问题是典型的生产者/消费者场景。对于这种情况,您应该有独立的逻辑,一方面用于出队,另一方面进行处理。它将尊重关注点分离,并将简化调试。

BlockingCollection<T> 将让您将两者分开:一方面,您添加要处理的项目,另一方面,您消耗它们。下面是如何实现它的示例:

您将需要 ParallelExtensionsExtras nuget 包用于 BlockingCollection<T> 工作负载分区(.GetConsumingEnumerable() 在处理方法中)。

public static class ProducerConsumer
{
    public static ConcurrentQueue<String> SqsQueue = new ConcurrentQueue<String>();         
    public static BlockingCollection<String> Collection = new BlockingCollection<String>();
    public static ConcurrentBag<String> Result = new ConcurrentBag<String>();

    public static async Task TestMethod()
    {
        // Here we separate all the Tasks in distinct threads
        Task sqs = Task.Run(async () =>
        {
            Console.WriteLine("Amazon on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
            while (true)
            {
                ProducerConsumer.BackgroundFakedAmazon(); // We produce 50 Strings each second
                await Task.Delay(1000);
            }
        });
        Task deq = Task.Run(async () =>
        {
            Console.WriteLine("Dequeue on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
            while (true)
            {
                ProducerConsumer.DequeueData(); // Dequeue 20 Strings each 100ms 
                await Task.Delay(100);
            }
        });

        Task process = Task.Run(() =>
        {
            Console.WriteLine("Process on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
            ProducerConsumer.BackgroundParallelConsumer(); // Process all the Strings in the BlockingCollection
        });

        await Task.WhenAll(c, sqs, deq, process);
    }

    public static void DequeueData()
    {
        foreach (var i in Enumerable.Range(0, 20))
        {
            String dequeued = null;
            if (SqsQueue.TryDequeue(out dequeued))
            {
                Collection.Add(dequeued);
                Console.WriteLine("Dequeued : " + dequeued);
            }
        }
    }

    public static void BackgroundFakedAmazon()
    {
        Console.WriteLine(" ---------- Generate 50 items on amazon side  ---------- ");
        foreach (var data in Enumerable.Range(0, 50).Select(i => Path.GetRandomFileName().Split('.').FirstOrDefault()))
            SqsQueue.Enqueue(data + " / ASQS");
    }

    public static void BackgroundParallelConsumer()
    {
        // Here we stay in Parallel.ForEach, waiting for data. Once processed, we are still waiting the next chunks
        Parallel.ForEach(Collection.GetConsumingEnumerable(), (i) =>
        {
            // Processing Logic
            String processedData = "Processed : " + i;
            Result.Add(processedData);
            Console.WriteLine(processedData);
        });

    }
}

您可以像这样从控制台应用程序中尝试:

static void Main(string[] args)
{
    ProducerConsumer.TestMethod().Wait();
}