Parallel.Foreach 在两个 foreach 循环中

Parallel.Foreach in two foreach loops

我有如下按一列分组的对象列表。

我有数百万条记录,需要 30 多分钟。如何高效地编写以下代码?

List<Voter> voterList = new List<Voter>();

IEnumerable<IGrouping<string, MemberInfo>> groupByLastName = infoList.GroupBy(info => info.LastName).Select(i => i);

foreach (List<MemberInfo> lastName in groupByLastName)
{
    foreach (MemberInfo member in lastName)
    {
        MemberInfo info = memberService.GetMemberDetails(member.FirstName);

        if (info.Age > 18)
        {
            voterList.Add(new Voter{
                VoterId = member.VoterId,
                Age = member.Age
            });
        }
    }
}
  1. 使用Parallel.ForEach

  2. 将结果对象添加到线程安全集合中。

下面是一些伪代码。我无法判断您有哪些物品可供使用,因为您的原始 post 多次使用 "var"。

 BlockingCollection<Voter> bc = new BlockingCollection<Voter>();


 Parallel.ForEach(myCollection, (e) => { 

     bc.Add(e);

 });

所以我会先收集你所有的输入值(到你的 wcf)

ICollection allTheInputLastNames = new List();

IEnumerable<IGrouping<string, MemberInfo>> groupByLastName = infoList.GroupBy(info => info.LastName).Select(i => i);

foreach (List<MemberInfo> lastName in groupByLastName)
{
    foreach (MemberInfo member in lastName)
    {
       allTheInputLastNames.add(member.FirstName);
    }      
}

现在,因为你有这么多,希望它运行得相当快。

现在您已经收集了所有输入,您想要使用 Parallel.ForEach。

我在下面创建了一个通用示例。

我有 inputValues,你有 allTheInputLastNames。

在我创建新的 ResultObject 的地方,您可以进行 wcf-service 调用。

在我做 "StringLength % 2" 的地方,你会 info.Age 检查。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MyApp.ParallelStuff
{
    public class ParallelExampleOne
    {

        public void ExampleOne()
        {

            ICollection<string> inputValues = new List<string>();

            for (int i = 1; i < 10000; i++)
            {
                inputValues.Add("MyValue" + Convert.ToString(i));
            }

            CancellationTokenSource ct = new CancellationTokenSource();

            BlockingCollection<ResultObject> finalItems = new BlockingCollection<ResultObject>();

            Parallel.ForEach(inputValues, (currentInputItem) =>
            {
                ResultObject ro = new ResultObject(currentInputItem.Length, currentInputItem);

                if (ro.StringLength % 2 == 0)
                {
                    finalItems.Add(ro);
                }

            });

            Console.WriteLine("ExampleOne.finalItems.Count={0}", finalItems.Count);
            string temp = string.Empty;
        }

        public void ExampleTwo()
        {

            ICollection<string> inputValues = new List<string>();
            for (int i = 1; i < 10000; i++)
            {
                inputValues.Add("MyValue" + Convert.ToString(i));
            }

            CancellationTokenSource ct = new CancellationTokenSource();

            BlockingCollection<ResultObject> finalItems = new BlockingCollection<ResultObject>();

            ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ct.Token };

            ParallelLoopResult results = Parallel.ForEach(inputValues, options, currentInputValue =>
            {
                ResultObject ro = new ResultObject(currentInputValue.Length, currentInputValue);

                if (ro.StringLength % 2 == 0)
                {
                    finalItems.Add(ro);
                }

            });

            Console.WriteLine("ExampleTwo.finalItems.Count={0}", finalItems.Count);

            string temp = string.Empty;
        }



    internal class ResultObject
    {
        internal int StringLength { get; private set; }
        internal string OutputValue { get; private set; }

        public ResultObject(int stringLength, string inputValue)
        {
            this.StringLength = stringLength;
            this.OutputValue = inputValue + "MyOutputSuffix";
        }
    }

}

另请注意如何阅读我的代码,因为我没有使用 "var" 作为我的变量声明。