Task.FromAsync 和两个线程
Task.FromAsync and two threads
我正在使用 .net 4.0 并具有以下代码:
var stream = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize,
FileOptions.Asynchronous | FileOptions.SequentialScan);
var buffer = new byte[bufferSize];
Debug.Assert(stream.IsAsync, "stream.IsAsync");
var ia = stream.BeginRead(buffer, 0, buffer.Length, t =>
{
var ms = new MemoryStream(buffer);
using (TextReader rdr = new StreamReader(ms, Encoding.ASCII))
{
for (uint iEpoch = 0; item < FileHeader.NUMBER_OF_ITEMS; item++)
{
dataList.Add(epochData);
}
}
}, null);
return Task<int>.Factory.FromAsync(ia, t =>
{
var st = stream;
var bytes1 = st.EndRead(t);
var a = EpochDataList.Count;
var b = FileHeader.NUMBER_OF_EPOCHS;
Debug.Assert(a == b);
st.Dispose();
return bytes1;
});
并且似乎在执行异步回调和结束方法 lambda 函数(断言正在提升)之间存在竞争条件。但是根据 msdn 明确指出 end 方法应该在异步回调完成后执行:
Creates a Task that executes an end method function when a specified IAsyncResult completes.
我是对的,我混淆了触发结束方法的 IO 操作完成的事实和异步回调完成的事实,所以它们都可能在同一个地方执行时间?
同时这段代码运行良好:
return Task<int>.Factory.FromAsync(stream.BeginRead, (ai) =>
{
var ms = new MemoryStream(buffer);
using (TextReader rdr = new StreamReader(ms, Encoding.ASCII))
{
using (TextReader rdr = new StreamReader(ms, Encoding.ASCII))
{
for (uint iEpoch = 0; item < FileHeader.NUMBER_OF_ITEMS; item++)
{
dataList.Add(epochData);
}
}
}
stream.Dispose();
return stream.EndRead(ai);
}, buffer, 0, buffer.Length, null);
另外我需要提到返回的任务在延续中使用。
提前致谢。
你做错了,我几乎倾向于不回答 - 你会用那个代码伤害别人。但是因为这不是代码审查...
您的最直接问题是您提供给BeginRead
的回调根本不是IAsyncResult
的一部分。因此,when a specified IAsyncResult completes
不会谈论您的回调,它只会谈论底层的异步操作 - 您会得到由同一事件启动的两个单独的回调。
现在,对于其他问题:
- 你需要不断发出
BeginRead
一遍又一遍,直到 EndRead
returns 0。否则,你最多只能读取整个缓冲区 - 如果你文件比那个长,你不会阅读整个文件。
- 您正在将老式异步 API 回调与基于任务的异步相结合。这势必会给你带来麻烦。只要学会正确使用 Tasks,你就会发现回调是 100% 不必要的。
EndRead
告诉您在前面的 BeginRead
操作中实际读取了多少字节 - 您忽略了该信息。
正确地做到这一点并不是那么容易 - 如果可能的话,我建议升级到 .NET 4.5,并利用 await
关键字。如果这不可能,您可以安装异步目标包,它将 await
添加到 4.0 作为一个简单的 NuGet 包。
使用await
,读取整个文件就像
一样简单
using (var sr = new StreamReader(fs))
{
string line;
while ((line = await sr.ReadLineAsync(buffer, 0, buffer.Length)) > 0)
{
// Do whatever
}
}
我正在使用 .net 4.0 并具有以下代码:
var stream = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize,
FileOptions.Asynchronous | FileOptions.SequentialScan);
var buffer = new byte[bufferSize];
Debug.Assert(stream.IsAsync, "stream.IsAsync");
var ia = stream.BeginRead(buffer, 0, buffer.Length, t =>
{
var ms = new MemoryStream(buffer);
using (TextReader rdr = new StreamReader(ms, Encoding.ASCII))
{
for (uint iEpoch = 0; item < FileHeader.NUMBER_OF_ITEMS; item++)
{
dataList.Add(epochData);
}
}
}, null);
return Task<int>.Factory.FromAsync(ia, t =>
{
var st = stream;
var bytes1 = st.EndRead(t);
var a = EpochDataList.Count;
var b = FileHeader.NUMBER_OF_EPOCHS;
Debug.Assert(a == b);
st.Dispose();
return bytes1;
});
并且似乎在执行异步回调和结束方法 lambda 函数(断言正在提升)之间存在竞争条件。但是根据 msdn 明确指出 end 方法应该在异步回调完成后执行:
Creates a Task that executes an end method function when a specified IAsyncResult completes.
我是对的,我混淆了触发结束方法的 IO 操作完成的事实和异步回调完成的事实,所以它们都可能在同一个地方执行时间?
同时这段代码运行良好:
return Task<int>.Factory.FromAsync(stream.BeginRead, (ai) =>
{
var ms = new MemoryStream(buffer);
using (TextReader rdr = new StreamReader(ms, Encoding.ASCII))
{
using (TextReader rdr = new StreamReader(ms, Encoding.ASCII))
{
for (uint iEpoch = 0; item < FileHeader.NUMBER_OF_ITEMS; item++)
{
dataList.Add(epochData);
}
}
}
stream.Dispose();
return stream.EndRead(ai);
}, buffer, 0, buffer.Length, null);
另外我需要提到返回的任务在延续中使用。
提前致谢。
你做错了,我几乎倾向于不回答 - 你会用那个代码伤害别人。但是因为这不是代码审查...
您的最直接问题是您提供给BeginRead
的回调根本不是IAsyncResult
的一部分。因此,when a specified IAsyncResult completes
不会谈论您的回调,它只会谈论底层的异步操作 - 您会得到由同一事件启动的两个单独的回调。
现在,对于其他问题:
- 你需要不断发出
BeginRead
一遍又一遍,直到EndRead
returns 0。否则,你最多只能读取整个缓冲区 - 如果你文件比那个长,你不会阅读整个文件。 - 您正在将老式异步 API 回调与基于任务的异步相结合。这势必会给你带来麻烦。只要学会正确使用 Tasks,你就会发现回调是 100% 不必要的。
EndRead
告诉您在前面的BeginRead
操作中实际读取了多少字节 - 您忽略了该信息。
正确地做到这一点并不是那么容易 - 如果可能的话,我建议升级到 .NET 4.5,并利用 await
关键字。如果这不可能,您可以安装异步目标包,它将 await
添加到 4.0 作为一个简单的 NuGet 包。
使用await
,读取整个文件就像
using (var sr = new StreamReader(fs))
{
string line;
while ((line = await sr.ReadLineAsync(buffer, 0, buffer.Length)) > 0)
{
// Do whatever
}
}