TPL FromAsync 任务不断被取消
TPL FromAsync Task keeps getting cancelled
我在循环中从客户端向服务器发送 10 条消息,然后使用 TPL 的 FromAsync(针对 .NET 4.0)从服务器向客户端返回响应
从客户端发送最后一条消息后,我正在尝试测量到收到最终响应为止所花费的总时间。为此,我调用 Task.WaitAll 但我得到一个 AggregateException,它的内部异常对所有任务说 "A task was cancelled"。服务器正在接收请求。
代码:
private static double MeasureAsyncTPL(List<string> stringList)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var tasks = new Task[stringList.Count];
for (int i = 0; i < stringList.Count; i++)
{
string source = stringList[i];
tasks[i] = SendMessageToServerAsync(source);
//SendMessageToServerAsync(source);
}
Task.WaitAll(tasks);
stopwatch.Stop();
return stopwatch.Elapsed.TotalMilliseconds;
}
static Task SendMessageToServerAsync(string message)
{
Task task;
var byteArray = MessageToByteArray(message, Encoding.UTF8);
using (var tcpClient = new TcpClient())
{
tcpClient.Connect("127.0.0.1", 5000);
using (var networkStream = tcpClient.GetStream())
{
task = Write(networkStream,byteArray,0);
Task continuation = task.ContinueWith(ant => Read(ant, networkStream), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent);
return continuation;
}
}
}
private static Task Write(NetworkStream stream, byte[] buffer, int offset)
{
return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, buffer.Length, null);
}
private static Task Read(Task write, NetworkStream stream)
{
byte[] data = new byte[50];
return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, data, 0, data.Length, null);
}
在 Task.WaitAll(tasks)
行的 MeasureAsyncTPL 内部发现异常。我该如何解决这个异常?
第二个问题 - 我的 Read 方法中的 FromAsync 是否保证 return 来自服务器的整个消息?或者我是否需要多次调用它,同时跟踪服务器发送的字节数组的大小?
您要在写入和读取操作开始或有时间工作之前处理 networkStream
和 tcpClient
。
使用 .NET 4.0,您需要做一些像这样复杂的事情(这是开始的事情,它需要更多的工作):
var byteArray = MessageToByteArray(message, Encoding.UTF8);
var tcpClient = new TcpClient();
NetworkStream networkStream = null;
try
{
tcpClient.Connect("127.0.0.1", 5000);
networkStream = tcpClient.GetStream();
}
catch
{
((IDisposable)tcpClient).Dispose();
throw;
}
byte[] read_buffer = new byte[50];
Action dispose_action = () =>
{
((IDisposable)tcpClient).Dispose();
networkStream.Dispose();
};
var write_task = networkStream.WriteAsync(byteArray, 0, byteArray.Length);
write_task
.ContinueWith(wt =>
{
var read_task = networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);
read_task.ContinueWith(rt =>
{
//See if rt task is successful and if so, process read data here
dispose_action();
});
},
TaskContinuationOptions.OnlyOnRanToCompletion);
write_task.ContinueWith(wt => dispose_action, TaskContinuationOptions.NotOnRanToCompletion);
上面代码的主要思想是,在 read/write 操作完成之前,您不会处理 TCP 客户端和网络流对象。
想象一下,如果您想在某种循环中多次读取,这段代码会变得多么复杂。
在 .NET 4.5 中,您可以使用 async
和 await
来做一些事情(更简单),例如:
static async Task SendMessageToServerAsync(string message)
{
var byteArray = MessageToByteArray(message, Encoding.UTF8);
using (var tcpClient = new TcpClient())
{
tcpClient.Connect("127.0.0.1", 5000);
using (var networkStream = tcpClient.GetStream())
{
await networkStream.WriteAsync(byteArray, 0, byteArray.Length);
byte[] read_buffer = new byte[50];
int read = await networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);
// do something with the read_buffer
}
}
}
我在循环中从客户端向服务器发送 10 条消息,然后使用 TPL 的 FromAsync(针对 .NET 4.0)从服务器向客户端返回响应
从客户端发送最后一条消息后,我正在尝试测量到收到最终响应为止所花费的总时间。为此,我调用 Task.WaitAll 但我得到一个 AggregateException,它的内部异常对所有任务说 "A task was cancelled"。服务器正在接收请求。
代码:
private static double MeasureAsyncTPL(List<string> stringList)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var tasks = new Task[stringList.Count];
for (int i = 0; i < stringList.Count; i++)
{
string source = stringList[i];
tasks[i] = SendMessageToServerAsync(source);
//SendMessageToServerAsync(source);
}
Task.WaitAll(tasks);
stopwatch.Stop();
return stopwatch.Elapsed.TotalMilliseconds;
}
static Task SendMessageToServerAsync(string message)
{
Task task;
var byteArray = MessageToByteArray(message, Encoding.UTF8);
using (var tcpClient = new TcpClient())
{
tcpClient.Connect("127.0.0.1", 5000);
using (var networkStream = tcpClient.GetStream())
{
task = Write(networkStream,byteArray,0);
Task continuation = task.ContinueWith(ant => Read(ant, networkStream), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent);
return continuation;
}
}
}
private static Task Write(NetworkStream stream, byte[] buffer, int offset)
{
return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, buffer.Length, null);
}
private static Task Read(Task write, NetworkStream stream)
{
byte[] data = new byte[50];
return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, data, 0, data.Length, null);
}
在 Task.WaitAll(tasks)
行的 MeasureAsyncTPL 内部发现异常。我该如何解决这个异常?
第二个问题 - 我的 Read 方法中的 FromAsync 是否保证 return 来自服务器的整个消息?或者我是否需要多次调用它,同时跟踪服务器发送的字节数组的大小?
您要在写入和读取操作开始或有时间工作之前处理 networkStream
和 tcpClient
。
使用 .NET 4.0,您需要做一些像这样复杂的事情(这是开始的事情,它需要更多的工作):
var byteArray = MessageToByteArray(message, Encoding.UTF8);
var tcpClient = new TcpClient();
NetworkStream networkStream = null;
try
{
tcpClient.Connect("127.0.0.1", 5000);
networkStream = tcpClient.GetStream();
}
catch
{
((IDisposable)tcpClient).Dispose();
throw;
}
byte[] read_buffer = new byte[50];
Action dispose_action = () =>
{
((IDisposable)tcpClient).Dispose();
networkStream.Dispose();
};
var write_task = networkStream.WriteAsync(byteArray, 0, byteArray.Length);
write_task
.ContinueWith(wt =>
{
var read_task = networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);
read_task.ContinueWith(rt =>
{
//See if rt task is successful and if so, process read data here
dispose_action();
});
},
TaskContinuationOptions.OnlyOnRanToCompletion);
write_task.ContinueWith(wt => dispose_action, TaskContinuationOptions.NotOnRanToCompletion);
上面代码的主要思想是,在 read/write 操作完成之前,您不会处理 TCP 客户端和网络流对象。
想象一下,如果您想在某种循环中多次读取,这段代码会变得多么复杂。
在 .NET 4.5 中,您可以使用 async
和 await
来做一些事情(更简单),例如:
static async Task SendMessageToServerAsync(string message)
{
var byteArray = MessageToByteArray(message, Encoding.UTF8);
using (var tcpClient = new TcpClient())
{
tcpClient.Connect("127.0.0.1", 5000);
using (var networkStream = tcpClient.GetStream())
{
await networkStream.WriteAsync(byteArray, 0, byteArray.Length);
byte[] read_buffer = new byte[50];
int read = await networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);
// do something with the read_buffer
}
}
}