NetworkStream AsyncWrite 速度

NetworkStream AsyncWrite speed

我正在学习异步套接字编程,为了完成一个更难的项目,我想创建一个用于群聊的服务器。设法成功做到了,但我不确定性能是否足够好,认为我做错了什么。

基本上,我将 400 个用户连接到服务器,然后从其中一个用户发送 1000 条消息(消息为 1kB,前缀长度,其余为空)。服务器需要向所有 400 个用户广播每条消息。服务器上有网络流列表,当服务器收到消息时,它会遍历列表并调用 stream.WriteAsync 方法。但是,服务器似乎需要 40-50 毫秒才能将该消息发送给所有 400 个用户。在测试期间,服务器 CPU 使用率约为 4%,而 StressClient CPU 使用率约为 55%。我原以为它会比 40-50 毫秒快得多。我是做错了什么还是这是最大速度?

这是服务器代码(最后两个方法最相关,ReceiveMessageAsync 和 SendToAllAsync)

private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();

public ServerEngine()
{
}

public void Start(IPAddress ipAddress, int port)
{
    TcpListener listener = new TcpListener(ipAddress, port);
    try
    {
        listener.Start();
        AcceptClientsAsync(listener);

        while (true)
        {
            Console.ReadKey(true);
            Console.WriteLine("Processed requests: " + processedRequestsAmount);
        }
    }
    finally
    {
        listener.Stop();
        Console.WriteLine("Server stopped! Press ENTER to close application...");
        Console.ReadLine();
    }
}

private async Task AcceptClientsAsync(TcpListener listener)
{
    while (true)
    {
        try
        {
            TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
            StartClientListenerAsync(client);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

private async Task StartClientListenerAsync(TcpClient client)
{
    using (client)
    {
        var buf = new byte[1024];
        NetworkStream stream = client.GetStream();
        lock (connectedUsers)
        {
            connectedUsers.Add(stream);
        }
        Console.WriteLine(connectedUsers.Count + " users connected!");

        while (true)
        {
            try
            {
                await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                break;
            }
        }

        connectedUsers.Remove(stream);
        Console.WriteLine("User disconnected.");
    }
}

private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
    int totalAmountRead = 0;

    // read header (length, 2 bytes total)
    while (totalAmountRead < 2)
    {
        totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
    }

    short totalLength = BitConverter.ToInt16(readBuffer, 0);

    // read rest of the message
    while (totalAmountRead < totalLength)
    {
        totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
    }

    await SendToAllAsync(readBuffer, totalLength);
}

private async Task SendToAllAsync(byte[] buffer, short totalLength)
{
    List<Task> tasks = new List<Task>(connectedUsers.Count);
    if (processedRequestsAmount == 0)
    {
        sw.Start();
    }

    foreach (NetworkStream stream in connectedUsers)
    {
        tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    }

    await Task.WhenAll(tasks).ConfigureAwait(false);

    processedRequestsAmount++;
    if (processedRequestsAmount == 1000)
    {
        sw.Stop();
        Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
    }
}

还有一点 - 在 SendMAilAsync 中,您可以禁止捕获 ExecutionContext

  foreach (NetworkStream stream in connectedUsers)
    {
        ExecutionContext.SuppressFlow();
        tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    }

请阅读 https://msdn.microsoft.com/en-us/magazine/hh456402.aspx

事实证明,当我 运行 服务器和 ClientStressTest 应用程序没有调试时(Visual Studio 中的 ctrl+F5),服务器只需要 5 毫秒(CPU 使用率约为 30%)向 400 个用户发送消息,这比我希望的要好得多。有人可以向我解释为什么附加调试器会减慢速度吗?

无论如何,如果有人需要它来解决这个问题,这里是剩余的代码

ClientStressTest 的 Program.cs

class Program
{
    static int NumOfClients = 400;
    static int NumOfMessages = 1000;

    static NetworkStream[] Streams = new NetworkStream[NumOfClients];
    static byte[] Message = new byte[1024];

    static void Main(string[] args)
    {
        Buffer.BlockCopy(BitConverter.GetBytes((short)1024), 0, Message, 0, sizeof(short));
        Console.WriteLine("Press ENTER to run setup");
        Console.ReadLine();

        Setup().Wait();


        Console.WriteLine("Press ENTER to start sending");
        Console.ReadLine();


        NetworkStream sender = Streams[0];
        for (int i = 0; i < NumOfMessages; i++)
        {
            sender.WriteAsync(Message, 0, 1024);
        }

        Console.ReadLine();
    }

    static async Task Setup()
    {
        for (int i = 0; i < Streams.Length; i++)
        {
            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect("localhost", 4000);
            NetworkStream stream = tcpClient.GetStream();
            Streams[i] = stream;
            Task.Run(() => CallbackListener(stream));
        }
    }

    static int counter = 0;
    static object objLock = new object();
    static async Task CallbackListener(NetworkStream stream)
    {
        var readBuffer = new byte[1024];
        int totalAmountRead;
        short totalLength;

        while (true)
        {
            totalAmountRead = 0;
            while (totalAmountRead < 2)
            {
                totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
            }

            totalLength = BitConverter.ToInt16(readBuffer, 0);

            while (totalAmountRead < totalLength)
            {
                totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
            }

            lock(objLock)
            {
                counter++;
                if (counter % 1000 == 0)
                {
                    // to see progress
                    Console.WriteLine(counter);
                }
            }
            // do nothing
        }
    }
}

服务器的Program.cs

class Program
{
    static void Main(string[] args)
    {
        var server = new ServerEngine();
        server.Start(IPAddress.Any, 4000);
    }
}

服务器的ServerEngine.cs

public class ServerEngine
{
    private List<NetworkStream> connectedUsers = new List<NetworkStream>();
    private int processedRequestsAmount = 0;
    private Stopwatch sw = new Stopwatch();

    public ServerEngine()
    {
    }

    public void Start(IPAddress ipAddress, int port)
    {
        TcpListener listener = new TcpListener(ipAddress, port);
        try
        {
            listener.Start();
            AcceptClientsAsync(listener);

            while (true)
            {
                Console.ReadKey(true);
                Console.WriteLine("Processed requests: " + processedRequestsAmount);
            }
        }
        finally
        {
            listener.Stop();
            Console.WriteLine("Server stopped! Press ENTER to close application...");
            Console.ReadLine();
        }
    }

    private async Task AcceptClientsAsync(TcpListener listener)
    {
        while (true)
        {
            try
            {
                TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
                StartClientListenerAsync(client);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }

    private async Task StartClientListenerAsync(TcpClient client)
    {
        using (client)
        {
            var buf = new byte[1024];
            NetworkStream stream = client.GetStream();
            lock (connectedUsers)
            {
                connectedUsers.Add(stream);
            }
            Console.WriteLine(connectedUsers.Count + " users connected!");

            while (true)
            {
                try
                {
                    await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    break;
                }
            }

            connectedUsers.Remove(stream);
            Console.WriteLine("User disconnected.");
        }
    }

    private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
    {
        int totalAmountRead = 0;

        // read header (length, 2 bytes total)
        while (totalAmountRead < 2)
        {
            totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
        }

        short totalLength = BitConverter.ToInt16(readBuffer, 0);

        // read rest of the message
        while (totalAmountRead < totalLength)
        {
            totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
        }

        await SendToAll(readBuffer, totalLength).ConfigureAwait(false);
    }

    private async Task SendToAll(byte[] buffer, short totalLength)
    {
        List<Task> tasks = new List<Task>(connectedUsers.Count);
        if (processedRequestsAmount == 0)
        {
            sw.Start();
        }

        foreach (NetworkStream stream in connectedUsers)
        {
            tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
        }

        await Task.WhenAll(tasks).ConfigureAwait(false);

        processedRequestsAmount++;
        if (processedRequestsAmount == 1000)
        {
            sw.Stop();
            Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
        }
    }
}