c# 多线程处理大文件行,每行 100 行

c# multi threading process large file lines in batches of 100

我有一个包含 500.000.000 行的文件。

这些行是最多 10 个字符的字符串。

如何使用多线程处理此文件并以 100 个为一批?

使用 MoreLinq 的 Batch 方法,这将创建一个 IEnumerable<string> 的集合,其中包含 100 行批量大小,它将为每 100 行旋转一个新任务。

这是一个基本的实现,明智的做法是使用 Semaphore 在任何给定时间仅 运行 一定数量的任务,并查看开销 File.ReadAllLines将具有 500,000,000 行的性能。

public class FileProcessor
{
    public async Task ProcessFile()
    {
        List<Task> tasks = new List<Task>();
        var lines = File.ReadAllLines("File.txt").Batch(100);
        foreach (IEnumerable<string> linesBatch in lines)
        {
            IEnumerable<string> localLinesBatch = linesBatch;
            Task task = Task.Factory.StartNew(() =>
            {
                // Perform operation on localLinesBatch
            });
            tasks.Add(task);
        }

        await Task.WhenAll(tasks);
    }
}

public static class LinqExtensions
{
    public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
              this IEnumerable<TSource> source, int size)
    {
        TSource[] bucket = null;
        var count = 0;

        foreach (var item in source)
        {
            if (bucket == null)
                bucket = new TSource[size];

            bucket[count++] = item;
            if (count != size)
                continue;

            yield return bucket;

            bucket = null;
            count = 0;
        }

        if (bucket != null && count > 0)
            yield return bucket.Take(count);
    }
}

如果您使用来自内置 TPL 的 Parallel.ForEach 并编写几个枚举器(如下所列),则不需要使用其他库。您的代码可能如下所示:

using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt")))
{
    Parallel.ForEach(
        input.ReadLines().TakeChunks(100),
        new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
        batchOfLines => {
            DoMyProcessing(batchOfLines);
        });
}

为此,您需要在 IEnumerable<T> 上使用几个扩展方法和几个枚举器,定义如下:

public static class EnumerableExtensions
{
    public static IEnumerable<string> ReadLines(this StreamReader input)
    {
        return new LineReadingEnumerable(input);
    }

    public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length)
    {
        return new ChunkingEnumerable<T>(source, length);
    }

    public class LineReadingEnumerable : IEnumerable<string>
    {
        private readonly StreamReader _input;

        public LineReadingEnumerable(StreamReader input)
        {
            _input = input;
        }
        public IEnumerator<string> GetEnumerator()
        {
            return new LineReadingEnumerator(_input);
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }

    public class LineReadingEnumerator : IEnumerator<string>
    {
        private readonly StreamReader _input;
        private string _current;

        public LineReadingEnumerator(StreamReader input)
        {
            _input = input;
        }
        public void Dispose()
        {
            _input.Dispose();
        }
        public bool MoveNext()
        {
            _current = _input.ReadLine();
            return (_current != null);
        }
        public void Reset()
        {
            throw new NotSupportedException();
        }
        public string Current
        {
            get { return _current; }
        }
        object IEnumerator.Current
        {
            get { return _current; }
        }
    }

    public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>>
    {
        private readonly IEnumerable<T> _inner;
        private readonly int _length;

        public ChunkingEnumerable(IEnumerable<T> inner, int length)
        {
            _inner = inner;
            _length = length;
        }
        public IEnumerator<IReadOnlyList<T>> GetEnumerator()
        {
            return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length);
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return this.GetEnumerator();
        }
    }

    public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>>
    {
        private readonly IEnumerator<T> _inner;
        private readonly int _length;
        private IReadOnlyList<T> _current;
        private bool _endOfInner;

        public ChunkingEnumerator(IEnumerator<T> inner, int length)
        {
            _inner = inner;
            _length = length;
        }
        public void Dispose()
        {
            _inner.Dispose();
            _current = null;
        }
        public bool MoveNext()
        {
            var currentBuffer = new List<T>();

            while (currentBuffer.Count < _length && !_endOfInner)
            {
                if (!_inner.MoveNext())
                {
                    _endOfInner = true;
                    break;
                }

                currentBuffer.Add(_inner.Current);
            }

            if (currentBuffer.Count > 0)
            {
                _current = currentBuffer;
                return true;
            }

            _current = null;
            return false;
        }
        public void Reset()
        {
            _inner.Reset();
            _current = null;
            _endOfInner = false;
        }
        public IReadOnlyList<T> Current
        {
            get
            {
                if (_current != null)
                {
                    return _current;
                }

                throw new InvalidOperationException();
            }
        }
        object IEnumerator.Current
        {
            get
            {
                return this.Current;
            }
        }
    }
}