将 LINQ 查询的枚举限制为一个

Restricting the enumerations of LINQ queries to One Only

我有一个不应多次枚举的 LINQ 查询,我想避免错误地枚举它两次。我可以使用任何扩展方法来确保我免受此类错误的影响吗?我在想这样的事情:

var numbers = Enumerable.Range(1, 10).OnlyOnce();
Console.WriteLine(numbers.Count()); // shows 10
Console.WriteLine(numbers.Count()); // throws InvalidOperationException: The query cannot be enumerated more than once.

我想要此功能的原因是因为我有一个可枚举的任务,旨在逐步实例化和 运行 任务,同时在控制下缓慢枚举。我已经错误地 运行 任务两次,因为我忘记了它是一个不同的枚举而不是 一个数组。

var tasks = Enumerable.Range(1, 10).Select(n => Task.Run(() => Console.WriteLine(n)));
Task.WaitAll(tasks.ToArray()); // Lets wait for the tasks to finish...
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id))); // Lets see the completed task IDs...
// Oups! A new set of tasks started running!

Enumerables 枚举,故事结束。您只需要调用 ToListToArray

// this will enumerate and start the tasks
var tasks = Enumerable.Range(1, 10)
                      .Select(n => Task.Run(() => Console.WriteLine(n)))
                      .ToList();

// wait for them all to finish
Task.WaitAll(tasks.ToArray());
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id)));

Hrm 如果你想要并行

Parallel.For(0, 100, index => Console.WriteLine(index) );

或者如果您正在使用异步和等待模式

public static async Task DoWorkLoads(IEnumerable <Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result);

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   await SomethingAsync(result);
}

更新,既然你在寻找控制最大并发度的方法,你可以使用这个

public static async Task<IEnumerable<Task>> ExecuteInParallel<T>(this IEnumerable<T> collection,Func<T, Task> callback,int degreeOfParallelism)
{
   var queue = new ConcurrentQueue<T>(collection);

   var tasks = Enumerable.Range(0, degreeOfParallelism)
                         .Select(async _ =>
                          {
                             while (queue.TryDequeue(out var item))
                                await callback(item);
                          })
                         .ToArray();

   await Task.WhenAll(tasks);

   return tasks;
}

I want to avoid enumerating it twice by mistake.

您可以使用一个在枚举两次时抛出异常的集合来包装该集合。

例如:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace ConsoleApp8
{
    public static class EnumExtension
    {
        class OnceEnumerable<T> : IEnumerable<T>
        {
            IEnumerable<T> col;
            bool hasBeenEnumerated = false;
            public OnceEnumerable(IEnumerable<T> col)
            {
                this.col = col;
            }

            public IEnumerator<T> GetEnumerator()
            {
                if (hasBeenEnumerated)
                {
                    throw new InvalidOperationException("This collection has already been enumerated.");
                }
                this.hasBeenEnumerated = true;
                return col.GetEnumerator();
            }

            IEnumerator IEnumerable.GetEnumerator()
            {
                return GetEnumerator();
            }
        }

        public static IEnumerable<T> OnlyOnce<T>(this IEnumerable<T> col)
        {
            return new OnceEnumerable<T>(col);
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
             var col = Enumerable.Range(1, 10).OnlyOnce();

             var colCount = col.Count(); //first enumeration
             foreach (var c in col) //second enumeration
             {
                 Console.WriteLine(c);
             }
        }
    }
}

Rx 当然是控制并行度的一个选项。

var query =
    Observable
        .Range(1, 10)
        .Select(n => Observable.FromAsync(() => Task.Run(() => new { Id = n })));

var tasks = query.Merge(maxConcurrent: 3).ToArray().Wait();

Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id)));