将流处理为 IAsyncEnumerable - 流不可读

Processing a Stream into IAsyncEnumerable - Stream is not readable

我有一个工作流程,我尝试执行以下操作:

我在下面创建了一个最小的复制示例:

class Program
{
    private static async Task<Stream> GetStream()
    {
        var text =
            @"Multi-line
            string";

        await Task.Yield();

        var bytes = Encoding.UTF8.GetBytes(text);
        return new MemoryStream(bytes);
    }

    private static async Task<T> StreamData<T>(Func<Stream, T> streamAction)
    {
        await using var stream = await GetStream();
        return streamAction(stream);
    }

    private static async Task StreamData(Func<Stream, Task> streamAction)
    {
        await using var stream = await GetStream();
        await streamAction(stream);
    }

    private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
    {
        using var reader = new StreamReader(stream);

        var line = await reader.ReadLineAsync();
        while (line != null)
        {
            yield return line;
            line = await reader.ReadLineAsync();
        }
    }

    private static async Task Test1()
    {
        async Task GetRecords(Stream str)
        {
            await foreach(var line in GetTextLinesFromStream(str))
                Console.WriteLine(line);
        }

        await StreamData(GetRecords);
    }

    private static async Task Test2()
    {
        await foreach(var line in await StreamData(GetTextLinesFromStream))
            Console.WriteLine(line);
    }

    static async Task Main(string[] args)
    {
        await Test1();
        await Test2();
    }
}  

在这里,方法 Test1 工作正常,而 Test2 没有,失败 Stream is not readable。问题是在第二种情况下,当代码开始处理实际流时,流已经被处理掉了。

大概这两个例子的区别在于,对于第一个例子,读取流是在仍在一次性 stream 的上下文中执行的,而在第二个例子中我们已经在外面了。

但是,我认为第二种情况也可能有效 - 至少我觉得它非常符合 C# 惯用语。为了让第二种情况也能正常工作,我还缺少什么吗?

Test2 方法的问题在于 Stream 在创建 IAsyncEnumerable<string> 时释放,而不是在其枚举完成时释放。

Test2 方法使用第一个 StreamData 重载,return 是 Task<T> 的重载。本例中的 TIAsyncEnumerable<string>。因此 StreamData 方法 return 是一个生成异步序列的任务,然后立即处理流(在生成序列之后)。显然,现在不是处理流的合适时机。正确的时机是在 await foreach 循环完成之后。

为了使 Test2 透明地工作,您应该添加 StreamData 方法的第三个重载,即 return 是 Task<IAsyncEnumerable<T>>(而不是 TaskTask<T>)。这个重载应该 return 一个专门的异步序列,它绑定到一个一次性资源,并在它的枚举完成时处理这个资源。下面是这样一个序列的实现:

public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T>
{
    private readonly IAsyncEnumerable<T> _source;
    private readonly IAsyncDisposable _disposable;

    public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,
        IAsyncDisposable disposable)
    {
        // Arguments validation omitted
        _source = source;
        _disposable = disposable;
    }

    async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(
        CancellationToken cancellationToken)
    {
        await using (_disposable.ConfigureAwait(false))
            await foreach (var item in _source
                .WithCancellation(cancellationToken)
                .ConfigureAwait(false)) yield return item;
    }
}

您可以像这样在 StreamData 方法中使用它:

private static async Task<IAsyncEnumerable<T>> StreamData<T>(
    Func<Stream, IAsyncEnumerable<T>> streamAction)
{
    var stream = await GetStream();
    return new AsyncEnumerableDisposable<T>(streamAction(stream), stream);
}

请记住,一般来说,IAsyncEnumerable<T> 可以在其生命周期内被枚举多次,通过将其包装到 AsyncEnumerableDisposable<T> 中,它实际上被简化为一个单一的枚举序列(因为资源将在第一次枚举后处理。


备选方案: System.Interactive.Async package contains the AsyncEnumerableEx.Using 运算符,可以代替自定义 AsyncEnumerableDisposable class:

private static async Task<IAsyncEnumerable<T>> StreamData<T>(
    Func<Stream, IAsyncEnumerable<T>> streamAction)
{
    var stream = await GetStream();
    return AsyncEnumerableEx.Using(() => stream, streamAction);
}

不同之处在于 Stream 将通过其 Dispose 方法同步处理。 AFAICS 不支持在此包中处理 IAsyncDisposables。

这里是 AsyncEnumerableEx.Using 方法的签名:

// Constructs an async-enumerable sequence that depends on a resource object, whose
// lifetime is tied to the resulting async-enumerable sequence's lifetime.
public static IAsyncEnumerable<TSource> Using<TSource, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory)
    where TResource : IDisposable;