NetworkStream AsyncWrite 速度
NetworkStream AsyncWrite speed
我正在学习异步套接字编程,为了完成一个更难的项目,我想创建一个用于群聊的服务器。设法成功做到了,但我不确定性能是否足够好,认为我做错了什么。
基本上,我将 400 个用户连接到服务器,然后从其中一个用户发送 1000 条消息(消息为 1kB,前缀长度,其余为空)。服务器需要向所有 400 个用户广播每条消息。服务器上有网络流列表,当服务器收到消息时,它会遍历列表并调用 stream.WriteAsync 方法。但是,服务器似乎需要 40-50 毫秒才能将该消息发送给所有 400 个用户。在测试期间,服务器 CPU 使用率约为 4%,而 StressClient CPU 使用率约为 55%。我原以为它会比 40-50 毫秒快得多。我是做错了什么还是这是最大速度?
这是服务器代码(最后两个方法最相关,ReceiveMessageAsync 和 SendToAllAsync)
private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();
public ServerEngine()
{
}
public void Start(IPAddress ipAddress, int port)
{
TcpListener listener = new TcpListener(ipAddress, port);
try
{
listener.Start();
AcceptClientsAsync(listener);
while (true)
{
Console.ReadKey(true);
Console.WriteLine("Processed requests: " + processedRequestsAmount);
}
}
finally
{
listener.Stop();
Console.WriteLine("Server stopped! Press ENTER to close application...");
Console.ReadLine();
}
}
private async Task AcceptClientsAsync(TcpListener listener)
{
while (true)
{
try
{
TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
StartClientListenerAsync(client);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
private async Task StartClientListenerAsync(TcpClient client)
{
using (client)
{
var buf = new byte[1024];
NetworkStream stream = client.GetStream();
lock (connectedUsers)
{
connectedUsers.Add(stream);
}
Console.WriteLine(connectedUsers.Count + " users connected!");
while (true)
{
try
{
await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
}
catch (Exception ex)
{
break;
}
}
connectedUsers.Remove(stream);
Console.WriteLine("User disconnected.");
}
}
private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
int totalAmountRead = 0;
// read header (length, 2 bytes total)
while (totalAmountRead < 2)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
}
short totalLength = BitConverter.ToInt16(readBuffer, 0);
// read rest of the message
while (totalAmountRead < totalLength)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
}
await SendToAllAsync(readBuffer, totalLength);
}
private async Task SendToAllAsync(byte[] buffer, short totalLength)
{
List<Task> tasks = new List<Task>(connectedUsers.Count);
if (processedRequestsAmount == 0)
{
sw.Start();
}
foreach (NetworkStream stream in connectedUsers)
{
tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
processedRequestsAmount++;
if (processedRequestsAmount == 1000)
{
sw.Stop();
Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
}
}
还有一点 - 在 SendMAilAsync 中,您可以禁止捕获 ExecutionContext
foreach (NetworkStream stream in connectedUsers)
{
ExecutionContext.SuppressFlow();
tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
}
事实证明,当我 运行 服务器和 ClientStressTest 应用程序没有调试时(Visual Studio 中的 ctrl+F5),服务器只需要 5 毫秒(CPU 使用率约为 30%)向 400 个用户发送消息,这比我希望的要好得多。有人可以向我解释为什么附加调试器会减慢速度吗?
无论如何,如果有人需要它来解决这个问题,这里是剩余的代码
ClientStressTest 的 Program.cs
class Program
{
static int NumOfClients = 400;
static int NumOfMessages = 1000;
static NetworkStream[] Streams = new NetworkStream[NumOfClients];
static byte[] Message = new byte[1024];
static void Main(string[] args)
{
Buffer.BlockCopy(BitConverter.GetBytes((short)1024), 0, Message, 0, sizeof(short));
Console.WriteLine("Press ENTER to run setup");
Console.ReadLine();
Setup().Wait();
Console.WriteLine("Press ENTER to start sending");
Console.ReadLine();
NetworkStream sender = Streams[0];
for (int i = 0; i < NumOfMessages; i++)
{
sender.WriteAsync(Message, 0, 1024);
}
Console.ReadLine();
}
static async Task Setup()
{
for (int i = 0; i < Streams.Length; i++)
{
TcpClient tcpClient = new TcpClient();
tcpClient.Connect("localhost", 4000);
NetworkStream stream = tcpClient.GetStream();
Streams[i] = stream;
Task.Run(() => CallbackListener(stream));
}
}
static int counter = 0;
static object objLock = new object();
static async Task CallbackListener(NetworkStream stream)
{
var readBuffer = new byte[1024];
int totalAmountRead;
short totalLength;
while (true)
{
totalAmountRead = 0;
while (totalAmountRead < 2)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
}
totalLength = BitConverter.ToInt16(readBuffer, 0);
while (totalAmountRead < totalLength)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
}
lock(objLock)
{
counter++;
if (counter % 1000 == 0)
{
// to see progress
Console.WriteLine(counter);
}
}
// do nothing
}
}
}
服务器的Program.cs
class Program
{
static void Main(string[] args)
{
var server = new ServerEngine();
server.Start(IPAddress.Any, 4000);
}
}
服务器的ServerEngine.cs
public class ServerEngine
{
private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();
public ServerEngine()
{
}
public void Start(IPAddress ipAddress, int port)
{
TcpListener listener = new TcpListener(ipAddress, port);
try
{
listener.Start();
AcceptClientsAsync(listener);
while (true)
{
Console.ReadKey(true);
Console.WriteLine("Processed requests: " + processedRequestsAmount);
}
}
finally
{
listener.Stop();
Console.WriteLine("Server stopped! Press ENTER to close application...");
Console.ReadLine();
}
}
private async Task AcceptClientsAsync(TcpListener listener)
{
while (true)
{
try
{
TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
StartClientListenerAsync(client);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
private async Task StartClientListenerAsync(TcpClient client)
{
using (client)
{
var buf = new byte[1024];
NetworkStream stream = client.GetStream();
lock (connectedUsers)
{
connectedUsers.Add(stream);
}
Console.WriteLine(connectedUsers.Count + " users connected!");
while (true)
{
try
{
await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
}
catch (Exception ex)
{
break;
}
}
connectedUsers.Remove(stream);
Console.WriteLine("User disconnected.");
}
}
private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
int totalAmountRead = 0;
// read header (length, 2 bytes total)
while (totalAmountRead < 2)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
}
short totalLength = BitConverter.ToInt16(readBuffer, 0);
// read rest of the message
while (totalAmountRead < totalLength)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
}
await SendToAll(readBuffer, totalLength).ConfigureAwait(false);
}
private async Task SendToAll(byte[] buffer, short totalLength)
{
List<Task> tasks = new List<Task>(connectedUsers.Count);
if (processedRequestsAmount == 0)
{
sw.Start();
}
foreach (NetworkStream stream in connectedUsers)
{
tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
processedRequestsAmount++;
if (processedRequestsAmount == 1000)
{
sw.Stop();
Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
}
}
}
我正在学习异步套接字编程,为了完成一个更难的项目,我想创建一个用于群聊的服务器。设法成功做到了,但我不确定性能是否足够好,认为我做错了什么。
基本上,我将 400 个用户连接到服务器,然后从其中一个用户发送 1000 条消息(消息为 1kB,前缀长度,其余为空)。服务器需要向所有 400 个用户广播每条消息。服务器上有网络流列表,当服务器收到消息时,它会遍历列表并调用 stream.WriteAsync 方法。但是,服务器似乎需要 40-50 毫秒才能将该消息发送给所有 400 个用户。在测试期间,服务器 CPU 使用率约为 4%,而 StressClient CPU 使用率约为 55%。我原以为它会比 40-50 毫秒快得多。我是做错了什么还是这是最大速度?
这是服务器代码(最后两个方法最相关,ReceiveMessageAsync 和 SendToAllAsync)
private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();
public ServerEngine()
{
}
public void Start(IPAddress ipAddress, int port)
{
TcpListener listener = new TcpListener(ipAddress, port);
try
{
listener.Start();
AcceptClientsAsync(listener);
while (true)
{
Console.ReadKey(true);
Console.WriteLine("Processed requests: " + processedRequestsAmount);
}
}
finally
{
listener.Stop();
Console.WriteLine("Server stopped! Press ENTER to close application...");
Console.ReadLine();
}
}
private async Task AcceptClientsAsync(TcpListener listener)
{
while (true)
{
try
{
TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
StartClientListenerAsync(client);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
private async Task StartClientListenerAsync(TcpClient client)
{
using (client)
{
var buf = new byte[1024];
NetworkStream stream = client.GetStream();
lock (connectedUsers)
{
connectedUsers.Add(stream);
}
Console.WriteLine(connectedUsers.Count + " users connected!");
while (true)
{
try
{
await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
}
catch (Exception ex)
{
break;
}
}
connectedUsers.Remove(stream);
Console.WriteLine("User disconnected.");
}
}
private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
int totalAmountRead = 0;
// read header (length, 2 bytes total)
while (totalAmountRead < 2)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
}
short totalLength = BitConverter.ToInt16(readBuffer, 0);
// read rest of the message
while (totalAmountRead < totalLength)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
}
await SendToAllAsync(readBuffer, totalLength);
}
private async Task SendToAllAsync(byte[] buffer, short totalLength)
{
List<Task> tasks = new List<Task>(connectedUsers.Count);
if (processedRequestsAmount == 0)
{
sw.Start();
}
foreach (NetworkStream stream in connectedUsers)
{
tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
processedRequestsAmount++;
if (processedRequestsAmount == 1000)
{
sw.Stop();
Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
}
}
还有一点 - 在 SendMAilAsync 中,您可以禁止捕获 ExecutionContext
foreach (NetworkStream stream in connectedUsers)
{
ExecutionContext.SuppressFlow();
tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
}
事实证明,当我 运行 服务器和 ClientStressTest 应用程序没有调试时(Visual Studio 中的 ctrl+F5),服务器只需要 5 毫秒(CPU 使用率约为 30%)向 400 个用户发送消息,这比我希望的要好得多。有人可以向我解释为什么附加调试器会减慢速度吗?
无论如何,如果有人需要它来解决这个问题,这里是剩余的代码
ClientStressTest 的 Program.cs
class Program
{
static int NumOfClients = 400;
static int NumOfMessages = 1000;
static NetworkStream[] Streams = new NetworkStream[NumOfClients];
static byte[] Message = new byte[1024];
static void Main(string[] args)
{
Buffer.BlockCopy(BitConverter.GetBytes((short)1024), 0, Message, 0, sizeof(short));
Console.WriteLine("Press ENTER to run setup");
Console.ReadLine();
Setup().Wait();
Console.WriteLine("Press ENTER to start sending");
Console.ReadLine();
NetworkStream sender = Streams[0];
for (int i = 0; i < NumOfMessages; i++)
{
sender.WriteAsync(Message, 0, 1024);
}
Console.ReadLine();
}
static async Task Setup()
{
for (int i = 0; i < Streams.Length; i++)
{
TcpClient tcpClient = new TcpClient();
tcpClient.Connect("localhost", 4000);
NetworkStream stream = tcpClient.GetStream();
Streams[i] = stream;
Task.Run(() => CallbackListener(stream));
}
}
static int counter = 0;
static object objLock = new object();
static async Task CallbackListener(NetworkStream stream)
{
var readBuffer = new byte[1024];
int totalAmountRead;
short totalLength;
while (true)
{
totalAmountRead = 0;
while (totalAmountRead < 2)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
}
totalLength = BitConverter.ToInt16(readBuffer, 0);
while (totalAmountRead < totalLength)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
}
lock(objLock)
{
counter++;
if (counter % 1000 == 0)
{
// to see progress
Console.WriteLine(counter);
}
}
// do nothing
}
}
}
服务器的Program.cs
class Program
{
static void Main(string[] args)
{
var server = new ServerEngine();
server.Start(IPAddress.Any, 4000);
}
}
服务器的ServerEngine.cs
public class ServerEngine
{
private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();
public ServerEngine()
{
}
public void Start(IPAddress ipAddress, int port)
{
TcpListener listener = new TcpListener(ipAddress, port);
try
{
listener.Start();
AcceptClientsAsync(listener);
while (true)
{
Console.ReadKey(true);
Console.WriteLine("Processed requests: " + processedRequestsAmount);
}
}
finally
{
listener.Stop();
Console.WriteLine("Server stopped! Press ENTER to close application...");
Console.ReadLine();
}
}
private async Task AcceptClientsAsync(TcpListener listener)
{
while (true)
{
try
{
TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
StartClientListenerAsync(client);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
private async Task StartClientListenerAsync(TcpClient client)
{
using (client)
{
var buf = new byte[1024];
NetworkStream stream = client.GetStream();
lock (connectedUsers)
{
connectedUsers.Add(stream);
}
Console.WriteLine(connectedUsers.Count + " users connected!");
while (true)
{
try
{
await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
}
catch (Exception ex)
{
break;
}
}
connectedUsers.Remove(stream);
Console.WriteLine("User disconnected.");
}
}
private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
int totalAmountRead = 0;
// read header (length, 2 bytes total)
while (totalAmountRead < 2)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
}
short totalLength = BitConverter.ToInt16(readBuffer, 0);
// read rest of the message
while (totalAmountRead < totalLength)
{
totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
}
await SendToAll(readBuffer, totalLength).ConfigureAwait(false);
}
private async Task SendToAll(byte[] buffer, short totalLength)
{
List<Task> tasks = new List<Task>(connectedUsers.Count);
if (processedRequestsAmount == 0)
{
sw.Start();
}
foreach (NetworkStream stream in connectedUsers)
{
tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
processedRequestsAmount++;
if (processedRequestsAmount == 1000)
{
sw.Stop();
Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
}
}
}