UDP 服务器的 C# await-able SocketAsyncEventArgs 包装器; socket.ReceiveAsync returns 无效的 RemoteEndPoint (0.0.0.0:0)

A C# await-able SocketAsyncEventArgs wrapper for a UDP server; socket.ReceiveAsync returns invalid RemoteEndPoint (0.0.0.0:0)

正如标题所说,我正在原始套接字上编写 UDP 服务器,并使用 SocketAsyncEventArgs,因为我想快速编写一些东西。

我知道 UdpClient 存在,并且有更简单的方法来编写服务器,但我想学习如何正确使用 SocketAsyncEventArgs 和 socket.ReceiveFromAsync / socket.SendToAsync 方法因为他们声称 'enchanced throughput' 和 'better scalability' (MSDN Docs for SocketAsyncEventArgs).

我基本上遵循了 MSDN 示例,因为我认为这是了解这些方法如何工作的一个不错的起点,运行 遇到了一些问题。服务器最初工作并且可以回显接收到的字节,但是将 运行domly 'fail' 从正确的地址接收字节。而不是正确的本地主机客户端地址(例如 127.0.0.1:7007),RemoteEndPoint 将由 UDP 占位符 EndPoint {0.0.0.0:0}(因为缺少更好的术语)填充。 Image showing the problem(控制台是波兰语,抱歉。请相信我,SocketException 消息是 "Desired Address is invalid in this context")。

我有时从 MSDN 示例中大量提取块,仅更改为 socket.ReceiveFromAsync 调用填充在 SocketAsyncEventArgs 实例上的字段(根据 MSDN 文档 -> socket.ReceiveFromAsync Docs ),结果还是一样。此外,这是一个间歇性的问题,而不是一个持续的问题。根据我的观察,服务器不会一直出错。

到目前为止,我的想法是 UdpServer 的状态问题、UdpClient 方面的一些不一致,或者 TaskCompletionSource 的误用。

编辑 1:

我觉得我应该解决为什么我使用 SocketAsyncEventArgs。我完全理解有更简单的方法来发送和接收数据。 async/await 套接字扩展是解决此问题的好方法,也是我最初使用的方法。我想将 async/await 与较旧的 api SocketArgs 进行基准测试,以了解这两种方法有多大不同。


UdpClient、UdpServer 和共享结构的代码包含在下面。如果 Whosebug 允许,我也可以尝试按需提供更多代码。

感谢您抽出宝贵时间帮助我。

测试代码

private static async Task TestNetworking()
        {
            EndPoint serverEndPoint = new IPEndPoint(IPAddress.Loopback, 12345);

            await Task.Factory.StartNew(async () =>
            {
                SocketClient client = new UdpClient();
                bool bound = client.Bind(new IPEndPoint(IPAddress.Any, 7007));
                if (bound)
                {
                    Console.WriteLine($"[Client] Bound client socket!");
                }

                if (bound && client.Connect(serverEndPoint))
                {
                    Console.WriteLine($"[Client] Connected to {serverEndPoint}!");

                    byte[] message = Encoding.UTF8.GetBytes("Hello World!");

                    Stopwatch stopwatch = new Stopwatch();

                    const int packetsToSend = 1_000_000;

                    for (int i = 0; i < packetsToSend; i++)
                    {
                        try
                        {
                            stopwatch.Start();

                            int sentBytes = await client.SendAsync(serverEndPoint, message, SocketFlags.None);

                            //Console.WriteLine($"[Client] Sent {sentBytes} to {serverEndPoint}");

                            ReceiveResult result = await client.ReceiveAsync(serverEndPoint, SocketFlags.None);

                            //Console.WriteLine($"[{result.RemoteEndPoint} > Client] : {Encoding.UTF8.GetString(result.Contents)}");
                            serverEndPoint = result.RemoteEndPoint;

                            stopwatch.Stop();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex);
                            i--;
                            await Task.Delay(1);
                        }
                    }

                    double approxBandwidth = (packetsToSend * message.Length) / (1_000_000.0 * (stopwatch.ElapsedMilliseconds / 1000.0));

                    Console.WriteLine($"Sent {packetsToSend} packets of {message.Length} bytes in {stopwatch.ElapsedMilliseconds:N} milliseconds.");
                    Console.WriteLine($"Approximate bandwidth: {approxBandwidth} MBps");
                }
            }, TaskCreationOptions.LongRunning);

            await Task.Factory.StartNew(async () =>
            {
                try
                {
                    SocketServer server = new UdpServer();
                    bool bound = server.Bind(serverEndPoint);
                    if (bound)
                    {
                        //Console.WriteLine($"[Server] Bound server socket!");

                        //Console.WriteLine($"Starting server at {serverEndPoint}!");

                        await server.StartAsync();
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
            }).Result;
        }

共享代码

public readonly struct ReceiveResult
    {
        public const int PacketSize = 1024;

        public readonly Memory<byte> Contents;

        public readonly int ReceivedBytes;

        public readonly EndPoint RemoteEndPoint;

        public ReceiveResult(Memory<byte> contents, int receivedBytes, EndPoint remoteEndPoint)
        {
            Contents = contents;

            ReceivedBytes = receivedBytes;

            RemoteEndPoint = remoteEndPoint;
        }
    }

UDP 客户端

public class UdpClient : SocketClient
    {
        /*
        public abstract class SocketClient
        {
            protected readonly Socket socket;

            protected SocketClient(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
            {
                socket = new Socket(addressFamily, socketType, protocolType);
            }

            public bool Bind(in EndPoint localEndPoint)
            {
                try
                {
                    socket.Bind(localEndPoint);

                    return true;
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);

                    return false;
                }
            }

            public bool Connect(in EndPoint remoteEndPoint)
            {
                try
                {
                    socket.Connect(remoteEndPoint);

                    return true;
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);

                    return false;
                }
            }

            public abstract Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags);

            public abstract Task<int> SendAsync(EndPoint remoteEndPoint, ArraySegment<byte> buffer, SocketFlags socketFlags);
        }
        */
        /// <inheritdoc />
        public UdpClient() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
        {
        }

        public override async Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags)
        {
            byte[] buffer = new byte[ReceiveResult.PacketSize];

            SocketReceiveFromResult result =
                await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), socketFlags, remoteEndPoint);

            return new ReceiveResult(buffer, result.ReceivedBytes, result.RemoteEndPoint);

            /*
            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.SetBuffer(new byte[ReceiveResult.PacketSize]);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            SocketTask awaitable = new SocketTask(args);

            while (ReceiveResult.PacketSize > args.BytesTransferred)
            {
                await socket.ReceiveFromAsync(awaitable);
            }

            return new ReceiveResult(args.MemoryBuffer, args.RemoteEndPoint);
            */
        }

        public override async Task<int> SendAsync(EndPoint remoteEndPoint, ArraySegment<byte> buffer, SocketFlags socketFlags)
        {
            return await socket.SendToAsync(buffer.ToArray(), socketFlags, remoteEndPoint);

            /*
            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.SetBuffer(buffer);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            SocketTask awaitable = new SocketTask(args);

            while (buffer.Length > args.BytesTransferred)
            {
                await socket.SendToAsync(awaitable);
            }

            return args.BytesTransferred;
            */
        }
    }

UDP 服务器

public class UdpServer : SocketServer
    {
        /*
        public abstract class SocketServer
        {
            protected readonly Socket socket;

            protected SocketServer(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
            {
                socket = new Socket(addressFamily, socketType, protocolType);
            }

            public bool Bind(in EndPoint localEndPoint)
            {
                try
                {
                    socket.Bind(localEndPoint);

                    return true;
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);

                    return false;
                }
            }

            public abstract Task StartAsync();
        }
         */

        private const int MaxPooledObjects = 100;
        private readonly ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>> clients;

        private readonly ArrayPool<byte> receiveBufferPool = ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);

        private readonly ObjectPool<SocketAsyncEventArgs> receiveSocketAsyncEventArgsPool =
            new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(), MaxPooledObjects);

        private readonly ObjectPool<SocketAsyncEventArgs> sendSocketAsyncEventArgsPool =
            new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(), MaxPooledObjects);

        private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
        {
            eventArgs.Completed -= HandleIOCompleted;
            bool closed = false;

            /*
             Original (local) methods in ReceiveAsync and SendAsync,
             these were assigned to eventArgs.Completed instead of HandleIOCompleted
             =======================================================================

            void ReceiveCompletedHandler(object? sender, SocketAsyncEventArgs eventArgs)
            {
                AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;
                eventArgs.Completed -= ReceiveCompletedHandler;

                if (eventArgs.SocketError != SocketError.Success)
                {
                    asyncReadToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
                }
                else
                {
                    eventArgs.MemoryBuffer.CopyTo(asyncReadToken.OutputBuffer);

                    asyncReadToken.CompletionSource.TrySetResult(
                        new ReceiveResult(asyncReadToken.OutputBuffer, eventArgs.BytesTransferred, eventArgs.RemoteEndPoint));
                }

                receiveBufferPool.Return(asyncReadToken.RentedBuffer);
                receiveSocketAsyncEventArgsPool.Return(eventArgs);
            }

            void SendCompletedHandler(object? sender, SocketAsyncEventArgs eventArgs)
            {
                AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;
                eventArgs.Completed -= SendCompletedHandler;

                if (eventArgs.SocketError != SocketError.Success)
                {
                    asyncWriteToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
                }
                else
                {
                    asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
                }

                sendSocketAsyncEventArgsPool.Return(eventArgs);
            }
            */

            switch (eventArgs.LastOperation)
            {
                case SocketAsyncOperation.SendTo:
                    AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;

                    if (eventArgs.SocketError != SocketError.Success)
                    {
                        asyncWriteToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
                    }
                    else
                    {
                        asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
                    }

                    sendSocketAsyncEventArgsPool.Return(eventArgs);

                    break;

                case SocketAsyncOperation.ReceiveFrom:
                    AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;

                    if (eventArgs.SocketError != SocketError.Success)
                    {
                        asyncReadToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
                    }
                    else
                    {
                        eventArgs.MemoryBuffer.CopyTo(asyncReadToken.OutputBuffer);

                        asyncReadToken.CompletionSource.TrySetResult(
                            new ReceiveResult(asyncReadToken.OutputBuffer, eventArgs.BytesTransferred, eventArgs.RemoteEndPoint));
                    }

                    receiveBufferPool.Return(asyncReadToken.RentedBuffer);
                    receiveSocketAsyncEventArgsPool.Return(eventArgs);

                    break;

                case SocketAsyncOperation.Disconnect:
                    closed = true;
                    break;

                case SocketAsyncOperation.Accept:
                case SocketAsyncOperation.Connect:
                case SocketAsyncOperation.None:
                    break;
            }

            if (closed)
            {
                // handle the client closing the connection on tcp servers at some point
            }
        }

        private Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags, Memory<byte> outputBuffer)
        {
            TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();

            byte[] buffer = receiveBufferPool.Rent(ReceiveResult.PacketSize);
            Memory<byte> memoryBuffer = new Memory<byte>(buffer);

            SocketAsyncEventArgs args = receiveSocketAsyncEventArgsPool.Get();
            args.SetBuffer(memoryBuffer);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            args.UserToken = new AsyncReadToken(buffer, outputBuffer, tcs);
            args.Completed += HandleIOCompleted;

            if (socket.ReceiveFromAsync(args)) return tcs.Task;

            byte[] bufferCopy = new byte[ReceiveResult.PacketSize];

            args.MemoryBuffer.CopyTo(bufferCopy);

            ReceiveResult result = new ReceiveResult(bufferCopy, args.BytesTransferred, args.RemoteEndPoint);

            receiveBufferPool.Return(buffer);
            receiveSocketAsyncEventArgsPool.Return(args);

            return Task.FromResult(result);
        }

        private Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> buffer, SocketFlags socketFlags)
        {
            TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();

            SocketAsyncEventArgs args = sendSocketAsyncEventArgsPool.Get();
            args.SetBuffer(buffer);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            args.UserToken = new AsyncWriteToken(buffer, tcs);
            args.Completed += HandleIOCompleted;

            if (socket.SendToAsync(args)) return tcs.Task;

            int result = args.BytesTransferred;
            sendSocketAsyncEventArgsPool.Return(args);

            return Task.FromResult(result);
        }

        private readonly struct AsyncReadToken
        {
            public readonly TaskCompletionSource<ReceiveResult> CompletionSource;

            public readonly Memory<byte> OutputBuffer;
            public readonly byte[] RentedBuffer;

            public AsyncReadToken(byte[] rentedBuffer, Memory<byte> outputBuffer, TaskCompletionSource<ReceiveResult> tcs)
            {
                RentedBuffer = rentedBuffer;
                OutputBuffer = outputBuffer;

                CompletionSource = tcs;
            }
        }

        private readonly struct AsyncWriteToken
        {
            public readonly TaskCompletionSource<int> CompletionSource;

            public readonly Memory<byte> OutputBuffer;

            public AsyncWriteToken(Memory<byte> outputBuffer, TaskCompletionSource<int> tcs)
            {
                OutputBuffer = outputBuffer;

                CompletionSource = tcs;
            }
        }

        public UdpServer() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
        {
            clients = new ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>>();
        }

        /// <inheritdoc />
        public override async Task StartAsync()
        {
            EndPoint nullEndPoint = new IPEndPoint(IPAddress.Any, 0);
            byte[] receiveBuffer = new byte[ReceiveResult.PacketSize];
            Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);

            while (true)
            {
                ReceiveResult result = await ReceiveAsync(nullEndPoint, SocketFlags.None, receiveBufferMemory);

                Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");

                int sentBytes = await SendAsync(result.RemoteEndPoint, result.Contents, SocketFlags.None);
                Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
            }
        }
    }

我成功解决了问题!

我最终不得不合并 SocketAsyncEventArgs 池,因为事实证明您需要在接收和发送调用期间保留单个 args 对象。现在,我的 SendToAsync 函数采用一个 SocketAsyncEventArgs 对象(在 ReceiveFromAsync 调用中租用),该对象包含要向其发送响应的客户端的 RemoteEndPoint。 SendToAsync 函数用于清理 SocketAsyncEventArgs,并将它们 return 放入池中。

我之前解决方案的另一个问题是多次分配事件。当我合并两个套接字参数池时,我离开了多事件处理程序分配,这最终导致了问题。一旦修复,该解决方案就完全按预期工作,并且可以毫无问题地发送 1 000 000 个数据包(1Kb)。真正的早期测试(可能稍微偏离)显示带宽约为每秒 5 兆字节(约每秒 40 兆比特),这是可以接受的,并且比我自己的过于复杂的 'fast async' 版本的代码要好得多。

关于带宽,我的快速异步版本过于复杂,因此没有真正的可比性,但我相信这个 SocketAsyncEventArgs 版本可以作为基准测试和修补的良好起点,以尽可能多地发挥性能一个插座尽可能。不过,我仍然希望对此有反馈,并且可能会 post 在某个时候将其提交给 Code Review 堆栈交换,因为我怀疑解决方案中仍然没有细微的错误。

任何想使用此代码的人都可以免费使用,它最终比预期的更简单,更容易创建,但如果你愚蠢到在生产中使用它而没有进行广泛的测试,我不承担任何责任(这是毕竟是一个学习项目)。


测试代码:

private static async Task TestNetworking()
        {
            EndPoint serverEndPoint = new IPEndPoint(IPAddress.Loopback, 12345);

            await Task.Factory.StartNew(async () =>
            {
                try
                {
                    SocketServer server = new UdpServer();
                    bool bound = server.Bind(serverEndPoint);
                    if (bound)
                    {
                        Console.WriteLine($"[Server] Bound server socket!");

                        Console.WriteLine($"[Server] Starting server at {serverEndPoint}!");

                        await server.StartAsync();
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
            }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

            await Task.Factory.StartNew(async () =>
            {
                SocketClient client = new UdpClient();
                bool bound = client.Bind(new IPEndPoint(IPAddress.Any, 7007));
                if (bound)
                {
                    Console.WriteLine($"[Client] Bound client socket!");
                }

                if (bound && client.Connect(serverEndPoint))
                {
                    Console.WriteLine($"[Client] Connected to {serverEndPoint}!");

                    byte[] message = Encoding.UTF8.GetBytes("Hello World!");
                    Memory<byte> messageBuffer = new Memory<byte>(message);

                    byte[] response = new byte[ReceiveResult.PacketSize];
                    Memory<byte> responseBuffer = new Memory<byte>(response);

                    Stopwatch stopwatch = new Stopwatch();

                    const int packetsToSend = 1_000_000, statusPacketThreshold = 10_000;

                    Console.WriteLine($"Started sending packets (total packet count: {packetsToSend})");

                    for (int i = 0; i < packetsToSend; i++)
                    {
                        if (i % statusPacketThreshold == 0)
                        {
                            Console.WriteLine($"Sent {i} packets out of {packetsToSend} ({((double)i / packetsToSend) * 100:F2}%)");
                        }

                        try
                        {
                            //Console.WriteLine($"[Client > {serverEndPoint}] Sending packet {i}");
                            stopwatch.Start();

                            int sentBytes = await client.SendAsync(serverEndPoint, messageBuffer, SocketFlags.None);

                            //Console.WriteLine($"[Client] Sent {sentBytes} to {serverEndPoint}");

                            ReceiveResult result = await client.ReceiveAsync(serverEndPoint, SocketFlags.None, responseBuffer);

                            //Console.WriteLine($"[{result.RemoteEndPoint} > Client] : {Encoding.UTF8.GetString(result.Contents)}");
                            serverEndPoint = result.RemoteEndPoint;

                            stopwatch.Stop();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex);
                            i--;
                            await Task.Delay(1);
                        }
                    }

                    double approxBandwidth = (packetsToSend * ReceiveResult.PacketSize) / (1_000_000.0 * (stopwatch.ElapsedMilliseconds / 1000.0));

                    Console.WriteLine($"Sent {packetsToSend} packets of {ReceiveResult.PacketSize} bytes in {stopwatch.ElapsedMilliseconds:N} milliseconds.");
                    Console.WriteLine($"Approximate bandwidth: {approxBandwidth} MBps");
                }
            }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result;
        }

共享代码:

internal readonly struct AsyncReadToken
    {
        public readonly CancellationToken CancellationToken;
        public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
        public readonly byte[] RentedBuffer;
        public readonly Memory<byte> UserBuffer;

        public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
            CancellationToken cancellationToken = default)
        {
            RentedBuffer = rentedBuffer;
            UserBuffer = userBuffer;

            CompletionSource = tcs;
            CancellationToken = cancellationToken;
        }
    }

internal readonly struct AsyncWriteToken
    {
        public readonly CancellationToken CancellationToken;
        public readonly TaskCompletionSource<int> CompletionSource;
        public readonly byte[] RentedBuffer;
        public readonly Memory<byte> UserBuffer;

        public AsyncWriteToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<int> tcs,
            CancellationToken cancellationToken = default)
        {
            RentedBuffer = rentedBuffer;
            UserBuffer = userBuffer;

            CompletionSource = tcs;
            CancellationToken = cancellationToken;
        }
    }

public readonly struct ReceiveResult
    {
        public const int PacketSize = 1024;

        public readonly SocketAsyncEventArgs ClientArgs;

        public readonly Memory<byte> Contents;

        public readonly int Count;

        public readonly EndPoint RemoteEndPoint;

        public ReceiveResult(SocketAsyncEventArgs clientArgs, Memory<byte> contents, int count, EndPoint remoteEndPoint)
        {
            ClientArgs = clientArgs;

            Contents = contents;
            Count = count;
            RemoteEndPoint = remoteEndPoint;
        }
    }

服务器代码:

public abstract class SocketServer
    {
        protected readonly Socket socket;

        protected SocketServer(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
        {
            socket = new Socket(addressFamily, socketType, protocolType);
        }

        public bool Bind(in EndPoint localEndPoint)
        {
            try
            {
                socket.Bind(localEndPoint);

                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);

                return false;
            }
        }

        public abstract Task StartAsync();
    }

public class UdpServer : SocketServer
    {
        private const int MaxPooledObjects = 1;
        private readonly ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>> clients;

        private readonly ArrayPool<byte> receiveBufferPool =
            ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);

        private readonly ArrayPool<byte> sendBufferPool =
            ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);

        private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncEventArgsPool =
            new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
                MaxPooledObjects);

        private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
        {
            bool closed = false;

            switch (eventArgs.LastOperation)
            {
                case SocketAsyncOperation.SendTo:
                    AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;

                    if (asyncWriteToken.CancellationToken.IsCancellationRequested)
                    {
                        asyncWriteToken.CompletionSource.TrySetCanceled();
                    }
                    else
                    {
                        if (eventArgs.SocketError != SocketError.Success)
                        {
                            asyncWriteToken.CompletionSource.TrySetException(
                                new SocketException((int)eventArgs.SocketError));
                        }
                        else
                        {
                            asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
                        }
                    }

                    sendBufferPool.Return(asyncWriteToken.RentedBuffer, true);
                    socketAsyncEventArgsPool.Return(eventArgs);

                    break;

                case SocketAsyncOperation.ReceiveFrom:
                    AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;

                    if (asyncReadToken.CancellationToken.IsCancellationRequested)
                    {
                        asyncReadToken.CompletionSource.SetCanceled();
                    }
                    else
                    {
                        if (eventArgs.SocketError != SocketError.Success)
                        {
                            asyncReadToken.CompletionSource.SetException(
                                new SocketException((int)eventArgs.SocketError));
                        }
                        else
                        {
                            eventArgs.MemoryBuffer.CopyTo(asyncReadToken.UserBuffer);
                            ReceiveResult result = new ReceiveResult(eventArgs, asyncReadToken.UserBuffer,
                                eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);

                            asyncReadToken.CompletionSource.SetResult(result);
                        }
                    }

                    receiveBufferPool.Return(asyncReadToken.RentedBuffer, true);

                    break;

                case SocketAsyncOperation.Disconnect:
                    closed = true;
                    break;

                case SocketAsyncOperation.Accept:
                case SocketAsyncOperation.Connect:
                case SocketAsyncOperation.None:
                case SocketAsyncOperation.Receive:
                case SocketAsyncOperation.ReceiveMessageFrom:
                case SocketAsyncOperation.Send:
                case SocketAsyncOperation.SendPackets:
                    throw new NotImplementedException();

                default:
                    throw new ArgumentOutOfRangeException();
            }

            if (closed)
            {
                // handle the client closing the connection on tcp servers at some point
            }
        }

        private Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
            Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
        {
            TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();

            byte[] rentedBuffer = receiveBufferPool.Rent(ReceiveResult.PacketSize);
            Memory<byte> memoryBuffer = new Memory<byte>(rentedBuffer);

            SocketAsyncEventArgs args = socketAsyncEventArgsPool.Get();

            args.SetBuffer(memoryBuffer);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            args.UserToken = new AsyncReadToken(rentedBuffer, outputBuffer, tcs, cancellationToken);

            // if the receive operation doesn't complete synchronously, returns the awaitable task
            if (socket.ReceiveFromAsync(args)) return tcs.Task;

            args.MemoryBuffer.CopyTo(outputBuffer);

            ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);

            receiveBufferPool.Return(rentedBuffer, true);

            return Task.FromResult(result);
        }

        private Task<int> SendAsync(SocketAsyncEventArgs clientArgs, Memory<byte> inputBuffer, SocketFlags socketFlags,
            CancellationToken cancellationToken = default)
        {
            TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();

            byte[] rentedBuffer = sendBufferPool.Rent(ReceiveResult.PacketSize);
            Memory<byte> memoryBuffer = new Memory<byte>(rentedBuffer);

            inputBuffer.CopyTo(memoryBuffer);

            SocketAsyncEventArgs args = clientArgs;
            args.SetBuffer(memoryBuffer);
            args.SocketFlags = socketFlags;
            args.UserToken = new AsyncWriteToken(rentedBuffer, inputBuffer, tcs, cancellationToken);

            // if the send operation doesn't complete synchronously, return the awaitable task
            if (socket.SendToAsync(args)) return tcs.Task;

            int result = args.BytesTransferred;

            sendBufferPool.Return(rentedBuffer, true);
            socketAsyncEventArgsPool.Return(args);

            return Task.FromResult(result);
        }

        public UdpServer() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
        {
            clients = new ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>>();

            for (int i = 0; i < MaxPooledObjects; i++)
            {
                SocketAsyncEventArgs args = new SocketAsyncEventArgs();
                args.Completed += HandleIOCompleted;

                socketAsyncEventArgsPool.Return(args);
            }
        }

        /// <inheritdoc />
        public override async Task StartAsync()
        {
            EndPoint nullEndPoint = new IPEndPoint(IPAddress.Any, 0);
            byte[] receiveBuffer = new byte[ReceiveResult.PacketSize];
            Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);

            while (true)
            {
                ReceiveResult result = await ReceiveAsync(nullEndPoint, SocketFlags.None, receiveBufferMemory);

                //Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");

                int sentBytes = await SendAsync(result.ClientArgs, result.Contents, SocketFlags.None);

                //Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
            }
        }

客户代码:

public abstract class SocketClient
    {
        protected readonly Socket socket;

        protected SocketClient(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
        {
            socket = new Socket(addressFamily, socketType, protocolType);
        }

        public bool Bind(in EndPoint localEndPoint)
        {
            try
            {
                socket.Bind(localEndPoint);

                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);

                return false;
            }
        }

        public bool Connect(in EndPoint remoteEndPoint)
        {
            try
            {
                socket.Connect(remoteEndPoint);

                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);

                return false;
            }
        }

        public abstract Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
            Memory<byte> outputBuffer);

        public abstract Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> inputBuffer, SocketFlags socketFlags);
    }

public class UdpClient : SocketClient
    {
        /// <inheritdoc />
        public UdpClient() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
        {
        }

        public override async Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
            Memory<byte> outputBuffer)
        {
            byte[] buffer = new byte[ReceiveResult.PacketSize];

            SocketReceiveFromResult result =
                await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), socketFlags, remoteEndPoint);

            buffer.CopyTo(outputBuffer);

            return new ReceiveResult(default, outputBuffer, result.ReceivedBytes, result.RemoteEndPoint);

            /*
            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.SetBuffer(new byte[ReceiveResult.PacketSize]);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            SocketTask awaitable = new SocketTask(args);

            while (ReceiveResult.PacketSize > args.BytesTransferred)
            {
                await socket.ReceiveFromAsync(awaitable);
            }

            return new ReceiveResult(args.MemoryBuffer, args.RemoteEndPoint);
            */
        }

        public override async Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> buffer, SocketFlags socketFlags)
        {
            return await socket.SendToAsync(buffer.ToArray(), socketFlags, remoteEndPoint);

            /*
            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.SetBuffer(buffer);
            args.SocketFlags = socketFlags;
            args.RemoteEndPoint = remoteEndPoint;
            SocketTask awaitable = new SocketTask(args);

            while (buffer.Length > args.BytesTransferred)
            {
                await socket.SendToAsync(awaitable);
            }

            return args.BytesTransferred;
            */
        }
    }