任务合并结果并继续

Tasks combine result and continue

我有 16 个任务在做同样的工作,每个任务 return 一个数组。我想将结果成对组合并做同样的工作,直到我只有一项任务。我不知道最好的方法是什么。

public static IComparatorNetwork[] Prune(IComparatorNetwork[] nets, int numTasks)
    {
        var tasks = new Task[numTasks];
        var netsPerTask = nets.Length/numTasks;
        var start = 0;
        var concurrentSet = new ConcurrentBag<IComparatorNetwork>();
        
        for(var i = 0; i  < numTasks; i++)
        {
            IComparatorNetwork[] taskNets;
            if (i == numTasks - 1)
            {
                taskNets = nets.Skip(start).ToArray();                 
            }
            else
            {
                taskNets = nets.Skip(start).Take(netsPerTask).ToArray();
            }

            start += netsPerTask;
            tasks[i] = Task.Factory.StartNew(() =>
            {
                var pruner = new Pruner();
                concurrentSet.AddRange(pruner.Prune(taskNets));
            });
        }

        Task.WaitAll(tasks.ToArray());

        if(numTasks > 1)
        {
            return Prune(concurrentSet.ToArray(), numTasks/2);
        }

        return concurrentSet.ToArray();
    }

现在我正在等待所有任务完成然后我重复一半的任务直到我只有一个。我不想在每次迭代中都等待所有。我对并行编程很陌生,可能这种方法不好。 我尝试并行化的代码如下:

public IComparatorNetwork[] Prune(IComparatorNetwork[] nets)
    {
        var result = new List<IComparatorNetwork>();

        for (var i = 0; i < nets.Length; i++) 
        {
            var isSubsumed = false;

            for (var index = result.Count - 1; index >= 0; index--)
            {
                var n = result[index];

                if (nets[i].IsSubsumed(n))
                {
                    isSubsumed = true;
                    break;
                }

                if (n.IsSubsumed(nets[i]))
                {
                    result.Remove(n);
                }
            }

            if (!isSubsumed) 
            {
                result.Add(nets[i]);
            }
        }

        return result.ToArray();
    }`

所以你在这里所做的基本上是聚合值,但是是并行的。幸运的是,PLINQ 已经有一个并行工作的聚合实现。因此,在您的情况下,您可以简单地将原始数组中的每个元素包装在它自己的一个元素数组中,然后您的 Prune 操作能够将任意两个网络数组组合成一个新的单个数组。

public static IComparatorNetwork[] Prune(IComparatorNetwork[] nets)
{
    return nets.Select(net => new[] { net })
        .AsParallel()
        .Aggregate((a, b) => new Pruner().Prune(a.Concat(b).ToArray()));
}

我对他们的聚合方法的内部结构不是很了解,但我想它可能非常好并且不会花费大量时间进行不必要的等待。但是,如果你想自己写,这样你就可以确保工作人员总是在他们的新工作开始后立即投入新工作,这是我自己的实现。请根据您的具体情况随意比较两者,看看哪个最适合您的需求。请注意,PLINQ 可以通过多种方式进行配置,请随意尝试其他配置,看看哪种配置最适合您的情况。

public static T AggregateInParallel<T>(this IEnumerable<T> values, Func<T, T, T> function, int numTasks)
{
    Queue<T> queue = new Queue<T>();
    foreach (var value in values)
        queue.Enqueue(value);
    if (!queue.Any())
        return default(T);  //Consider throwing or doing something else here if the sequence is empty

    (T, T)? GetFromQueue()
    {
        lock (queue)
        {
            if (queue.Count >= 2)
            {
                return (queue.Dequeue(), queue.Dequeue());
            }
            else
            {
                return null;
            }
        }
    }

    var tasks = Enumerable.Range(0, numTasks)
        .Select(_ => Task.Run(() =>
        {
            var pair = GetFromQueue();
            while (pair != null)
            {
                var result = function(pair.Value.Item1, pair.Value.Item2);
                lock (queue)
                {
                    queue.Enqueue(result);
                }
                pair = GetFromQueue();
            }
        }))
        .ToArray();
    Task.WaitAll(tasks);
    return queue.Dequeue();
}

此版本的调用代码如下所示:

public static IComparatorNetwork[] Prune2(IComparatorNetwork[] nets)
{
    return nets.Select(net => new[] { net })
        .AggregateInParallel((a, b) => new Pruner().Prune(a.Concat(b).ToArray()), nets.Length / 2);
}

如评论中所述,您可以让修剪器的 Prune 方法更有效,方法是让它接受两个集合,而不仅仅是一个,并且只比较每个集合中的项目与另一个集合,知道所有项目来自同一集合的元素不会包含该集合中的任何其他元素。这使得该方法不仅更短、更简单、更容易理解,而且还删除了相当大一部分昂贵的比较。一些小的调整也可以大大减少创建的中间集合的数量。

public static IReadOnlyList<IComparatorNetwork> Prune(IReadOnlyList<IComparatorNetwork> first, IReadOnlyList<IComparatorNetwork> second)
{
    var firstItemsNotSubsumed = first.Where(outerNet => !second.Any(innerNet => outerNet.IsSubsumed(innerNet)));
    var secondItemsNotSubsumed = second.Where(outerNet => !first.Any(innerNet => outerNet.IsSubsumed(innerNet)));
    return firstItemsNotSubsumed.Concat(secondItemsNotSubsumed).ToList();
}

调用代码只需要稍作调整即可确保类型匹配,并且您传入两个集合而不是先连接它们。

public static IReadOnlyList<IComparatorNetwork> Prune(IReadOnlyList<IComparatorNetwork> nets)
{
    return nets.Select(net => (IReadOnlyList<IComparatorNetwork>)new[] { net })
        .AggregateInParallel((a, b) => Pruner.Prune(a, b), nets.Count / 2);
}