将流处理为 IAsyncEnumerable - 流不可读
Processing a Stream into IAsyncEnumerable - Stream is not readable
我有一个工作流程,我尝试执行以下操作:
- 一种接受回调的方法,它在内部产生一个
Stream
并且该方法的调用者可以使用回调以他们想要的任何方式处理 Stream
- 在一种特殊情况下,调用者使用回调从流中生成
IAsyncEnumerable
。
我在下面创建了一个最小的复制示例:
class Program
{
private static async Task<Stream> GetStream()
{
var text =
@"Multi-line
string";
await Task.Yield();
var bytes = Encoding.UTF8.GetBytes(text);
return new MemoryStream(bytes);
}
private static async Task<T> StreamData<T>(Func<Stream, T> streamAction)
{
await using var stream = await GetStream();
return streamAction(stream);
}
private static async Task StreamData(Func<Stream, Task> streamAction)
{
await using var stream = await GetStream();
await streamAction(stream);
}
private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
{
using var reader = new StreamReader(stream);
var line = await reader.ReadLineAsync();
while (line != null)
{
yield return line;
line = await reader.ReadLineAsync();
}
}
private static async Task Test1()
{
async Task GetRecords(Stream str)
{
await foreach(var line in GetTextLinesFromStream(str))
Console.WriteLine(line);
}
await StreamData(GetRecords);
}
private static async Task Test2()
{
await foreach(var line in await StreamData(GetTextLinesFromStream))
Console.WriteLine(line);
}
static async Task Main(string[] args)
{
await Test1();
await Test2();
}
}
在这里,方法 Test1
工作正常,而 Test2
没有,失败 Stream is not readable
。问题是在第二种情况下,当代码开始处理实际流时,流已经被处理掉了。
大概这两个例子的区别在于,对于第一个例子,读取流是在仍在一次性 stream
的上下文中执行的,而在第二个例子中我们已经在外面了。
但是,我认为第二种情况也可能有效 - 至少我觉得它非常符合 C# 惯用语。为了让第二种情况也能正常工作,我还缺少什么吗?
Test2
方法的问题在于 Stream
在创建 IAsyncEnumerable<string>
时释放,而不是在其枚举完成时释放。
Test2
方法使用第一个 StreamData
重载,return 是 Task<T>
的重载。本例中的 T
是 IAsyncEnumerable<string>
。因此 StreamData
方法 return 是一个生成异步序列的任务,然后立即处理流(在生成序列之后)。显然,现在不是处理流的合适时机。正确的时机是在 await foreach
循环完成之后。
为了使 Test2
透明地工作,您应该添加 StreamData
方法的第三个重载,即 return 是 Task<IAsyncEnumerable<T>>
(而不是 Task
或 Task<T>
)。这个重载应该 return 一个专门的异步序列,它绑定到一个一次性资源,并在它的枚举完成时处理这个资源。下面是这样一个序列的实现:
public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T>
{
private readonly IAsyncEnumerable<T> _source;
private readonly IAsyncDisposable _disposable;
public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,
IAsyncDisposable disposable)
{
// Arguments validation omitted
_source = source;
_disposable = disposable;
}
async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(
CancellationToken cancellationToken)
{
await using (_disposable.ConfigureAwait(false))
await foreach (var item in _source
.WithCancellation(cancellationToken)
.ConfigureAwait(false)) yield return item;
}
}
您可以像这样在 StreamData
方法中使用它:
private static async Task<IAsyncEnumerable<T>> StreamData<T>(
Func<Stream, IAsyncEnumerable<T>> streamAction)
{
var stream = await GetStream();
return new AsyncEnumerableDisposable<T>(streamAction(stream), stream);
}
请记住,一般来说,IAsyncEnumerable<T>
可以在其生命周期内被枚举多次,通过将其包装到 AsyncEnumerableDisposable<T>
中,它实际上被简化为一个单一的枚举序列(因为资源将在第一次枚举后处理。
备选方案: System.Interactive.Async package contains the AsyncEnumerableEx.Using
运算符,可以代替自定义 AsyncEnumerableDisposable
class:
private static async Task<IAsyncEnumerable<T>> StreamData<T>(
Func<Stream, IAsyncEnumerable<T>> streamAction)
{
var stream = await GetStream();
return AsyncEnumerableEx.Using(() => stream, streamAction);
}
不同之处在于 Stream
将通过其 Dispose
方法同步处理。 AFAICS 不支持在此包中处理 IAsyncDisposable
s。
这里是 AsyncEnumerableEx.Using
方法的签名:
// Constructs an async-enumerable sequence that depends on a resource object, whose
// lifetime is tied to the resulting async-enumerable sequence's lifetime.
public static IAsyncEnumerable<TSource> Using<TSource, TResource>(
Func<TResource> resourceFactory,
Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory)
where TResource : IDisposable;
我有一个工作流程,我尝试执行以下操作:
- 一种接受回调的方法,它在内部产生一个
Stream
并且该方法的调用者可以使用回调以他们想要的任何方式处理Stream
- 在一种特殊情况下,调用者使用回调从流中生成
IAsyncEnumerable
。
我在下面创建了一个最小的复制示例:
class Program
{
private static async Task<Stream> GetStream()
{
var text =
@"Multi-line
string";
await Task.Yield();
var bytes = Encoding.UTF8.GetBytes(text);
return new MemoryStream(bytes);
}
private static async Task<T> StreamData<T>(Func<Stream, T> streamAction)
{
await using var stream = await GetStream();
return streamAction(stream);
}
private static async Task StreamData(Func<Stream, Task> streamAction)
{
await using var stream = await GetStream();
await streamAction(stream);
}
private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
{
using var reader = new StreamReader(stream);
var line = await reader.ReadLineAsync();
while (line != null)
{
yield return line;
line = await reader.ReadLineAsync();
}
}
private static async Task Test1()
{
async Task GetRecords(Stream str)
{
await foreach(var line in GetTextLinesFromStream(str))
Console.WriteLine(line);
}
await StreamData(GetRecords);
}
private static async Task Test2()
{
await foreach(var line in await StreamData(GetTextLinesFromStream))
Console.WriteLine(line);
}
static async Task Main(string[] args)
{
await Test1();
await Test2();
}
}
在这里,方法 Test1
工作正常,而 Test2
没有,失败 Stream is not readable
。问题是在第二种情况下,当代码开始处理实际流时,流已经被处理掉了。
大概这两个例子的区别在于,对于第一个例子,读取流是在仍在一次性 stream
的上下文中执行的,而在第二个例子中我们已经在外面了。
但是,我认为第二种情况也可能有效 - 至少我觉得它非常符合 C# 惯用语。为了让第二种情况也能正常工作,我还缺少什么吗?
Test2
方法的问题在于 Stream
在创建 IAsyncEnumerable<string>
时释放,而不是在其枚举完成时释放。
Test2
方法使用第一个 StreamData
重载,return 是 Task<T>
的重载。本例中的 T
是 IAsyncEnumerable<string>
。因此 StreamData
方法 return 是一个生成异步序列的任务,然后立即处理流(在生成序列之后)。显然,现在不是处理流的合适时机。正确的时机是在 await foreach
循环完成之后。
为了使 Test2
透明地工作,您应该添加 StreamData
方法的第三个重载,即 return 是 Task<IAsyncEnumerable<T>>
(而不是 Task
或 Task<T>
)。这个重载应该 return 一个专门的异步序列,它绑定到一个一次性资源,并在它的枚举完成时处理这个资源。下面是这样一个序列的实现:
public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T>
{
private readonly IAsyncEnumerable<T> _source;
private readonly IAsyncDisposable _disposable;
public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,
IAsyncDisposable disposable)
{
// Arguments validation omitted
_source = source;
_disposable = disposable;
}
async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(
CancellationToken cancellationToken)
{
await using (_disposable.ConfigureAwait(false))
await foreach (var item in _source
.WithCancellation(cancellationToken)
.ConfigureAwait(false)) yield return item;
}
}
您可以像这样在 StreamData
方法中使用它:
private static async Task<IAsyncEnumerable<T>> StreamData<T>(
Func<Stream, IAsyncEnumerable<T>> streamAction)
{
var stream = await GetStream();
return new AsyncEnumerableDisposable<T>(streamAction(stream), stream);
}
请记住,一般来说,IAsyncEnumerable<T>
可以在其生命周期内被枚举多次,通过将其包装到 AsyncEnumerableDisposable<T>
中,它实际上被简化为一个单一的枚举序列(因为资源将在第一次枚举后处理。
备选方案: System.Interactive.Async package contains the AsyncEnumerableEx.Using
运算符,可以代替自定义 AsyncEnumerableDisposable
class:
private static async Task<IAsyncEnumerable<T>> StreamData<T>(
Func<Stream, IAsyncEnumerable<T>> streamAction)
{
var stream = await GetStream();
return AsyncEnumerableEx.Using(() => stream, streamAction);
}
不同之处在于 Stream
将通过其 Dispose
方法同步处理。 AFAICS 不支持在此包中处理 IAsyncDisposable
s。
这里是 AsyncEnumerableEx.Using
方法的签名:
// Constructs an async-enumerable sequence that depends on a resource object, whose
// lifetime is tied to the resulting async-enumerable sequence's lifetime.
public static IAsyncEnumerable<TSource> Using<TSource, TResource>(
Func<TResource> resourceFactory,
Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory)
where TResource : IDisposable;