什么会导致 TCP 套接字接收 0 字节但在异步套接字服务器中有可用字节?

What would cause TCP socket to receive 0 bytes but have bytes available in async socket server?

我有一个基于 Microsoft 提供的示例的 C#、.NET 数据服务器:

https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socketasynceventargs?view=net-5.0

我已经让这个数据服务器工作了很多年,它有一个简单的设备侦听器来处理发送的数据。之前的用例是设备向服务器发送数据,服务器发回响应,然后设备关闭连接。这很好用。

我正在添加让设备在收到响应后发送额外数据的功能。此功能第一次运行良好。但是在第一次之后,我收到的后续消息中传输的字节数为零,SocketError 仍设置为 Success,但 Available 显示了客户端设备发送的正确数据量。因此,数据服务器将此视为关闭的连接。我不明白为什么服务器没有收到数据,即使 WireShark 显示数据已传输并且套接字上的可用字节数显示它们正在等待接收。这就像服务器进入了一个糟糕的状态,它实际上无法接收正在等待的字节。

这是数据服务器的代码:

using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace DeviceListener
{
    class SocketAsyncUserToken
    {
        public Socket Socket;
        public DateTime AcceptedAt;
        public DateTime LastUpdate;
        public bool Closed = false;
        public int TotalBytes = 0;
        public string DeviceId = null;
        public byte[] ReceiveBuffer = new byte[2048];
        public byte[] SendBuffer = new byte[2048];
    }

    public enum DataServerEventTypes
    {
        None = 0,
        Unknown,
        Error,
        Warning,
        Info,
        ServerStarted,
        ServerStopped,
        ConnectionAccepted,
        ConnectionClosed,
        ConnectionRefused,
        DataReceived,
        DataSent,
        MessageHandled
    }

    public class DataServerEventArgs : EventArgs
    {
        public DataServerEventTypes EventType;
        public string DeviceId;
        public string Details;
        public Exception Exception;
    }

    // Implements the connection logic for the socket server.   
    // After accepting a connection, all data read from the client  
    // is sent back to the client. The read and echo back to the client pattern  
    // is continued until the client disconnects. 
    public class SocketAsyncServer
    {
        private int m_numConnections;   // the maximum number of connections the sample is designed to handle simultaneously  
        private int m_receiveBufferSize;// buffer size to use for each socket I/O operation 
        SocketAsyncBufferManager m_bufferManager;  // represents a large reusable set of buffers for all socket operations 
        const int opsToPreAlloc = 2;    // read, write (don't alloc buffer space for accepts)
        Socket listenSocket;            // the socket used to listen for incoming connection requests 
        // pool of reusable SocketAsyncEventArgs objects for write, read and accept socket operations
        SocketAsyncEventArgsPool m_readWritePool;
        int m_totalBytesRead;           // counter of the total # bytes received by the server 
        int m_numConnectedSockets;      // the total number of clients connected to the server 
        Semaphore m_maxNumberAcceptedClients;
        string m_AuthenticationId;
        List<SocketAsyncEventArgs> m_AcceptedConnections;

        public event EventHandler<DataServerEventArgs> EventNotify;

        // Create an uninitialized server instance.   
        // To start the server listening for connection requests 
        // call the Init method followed by Start method  
        // 
        // <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>
        // <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
        public SocketAsyncServer(int numConnections, int receiveBufferSize, string authenticationId)
        {
            m_totalBytesRead = 0;
            m_numConnectedSockets = 0;
            m_numConnections = numConnections;
            m_receiveBufferSize = receiveBufferSize;
            m_AuthenticationId = authenticationId;
            // allocate buffers such that the maximum number of sockets can have one outstanding read and  
            //write posted to the socket simultaneously  
            m_bufferManager = new SocketAsyncBufferManager(receiveBufferSize * numConnections * opsToPreAlloc, receiveBufferSize);

            m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
            m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
            m_AcceptedConnections = new List<SocketAsyncEventArgs>();
        }

        /// <summary>
        /// Event handler triggered when a job is finished.
        /// </summary>
        /// <param name="e"></param>
        public void OnEventNotify(DataServerEventArgs e)
        {
            EventNotify?.Invoke(this, e);
        }

        // Initializes the server by preallocating reusable buffers and  
        // context objects.  These objects do not need to be preallocated  
        // or reused, but it is done this way to illustrate how the API can  
        // easily be used to create reusable objects to increase server performance. 
        // 
        public void Init()
        {
            // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds  
            // against memory fragmentation
            m_bufferManager.InitBuffer();

            // preallocate pool of SocketAsyncEventArgs objects
            SocketAsyncEventArgs readWriteEventArg;

            for (int i = 0; i < m_numConnections; i++)
            {
                //Pre-allocate a set of reusable SocketAsyncEventArgs
                readWriteEventArg = new SocketAsyncEventArgs();
                readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(SendReceive_Completed);
                readWriteEventArg.UserToken = new SocketAsyncUserToken();

                // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
                m_bufferManager.SetBuffer(readWriteEventArg);

                // add SocketAsyncEventArg to the pool
                m_readWritePool.Push(readWriteEventArg);
            }
        }

        // Starts the server such that it is listening for  
        // incoming connection requests.     
        // 
        // <param name="localEndPoint">The endpoint which the server will listening 
        // for connection requests on</param> 
        public bool Start(IPEndPoint localEndPoint)
        {
            DataServerEventArgs args = new DataServerEventArgs();
            args.EventType = DataServerEventTypes.ServerStarted;
            args.Details = "Server is starting on port " + localEndPoint.Port + ".";
            OnEventNotify(args);

            // create the socket which listens for incoming connections
            listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            listenSocket.Bind(localEndPoint);
            // start the server with a listen backlog of 100 connections
            listenSocket.Listen(100);

            // post accepts on the listening socket
            StartAccept(null);

            return true;
        }

        public bool Stop()
        {
            List<SocketAsyncEventArgs> connectionsToClose = new List<SocketAsyncEventArgs>();
            foreach (SocketAsyncEventArgs s in m_AcceptedConnections)
            {
                if (s.ConnectSocket != null)
                {
                    DataServerEventArgs args = new DataServerEventArgs();
                    args.EventType = DataServerEventTypes.ConnectionClosed;
                    args.Details = "The server has forced closed socket " + s.ConnectSocket.Handle.ToInt32() + " due to shutdown.";
                    OnEventNotify(args);
                    connectionsToClose.Add(s);
                }
            }
            if (connectionsToClose.Count > 0)
            {
                foreach (SocketAsyncEventArgs s in connectionsToClose)
                {
                    CloseClientSocket(s);
                }
            }
            return true;
        }

        // Begins an operation to accept a connection request from the client  
        // 
        // <param name="acceptEventArg">The context object to use when issuing 
        // the accept operation on the server's listening socket</param> 
        public void StartAccept(SocketAsyncEventArgs acceptEventArg)
        {
            if (acceptEventArg == null)
            {
                acceptEventArg = new SocketAsyncEventArgs();
                acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
            }
            else
            {
                // socket must be cleared since the context object is being reused
                acceptEventArg.AcceptSocket = null;
            }

            DateTime currentTime = DateTime.Now;
            SocketAsyncUserToken token = null;
            TimeSpan ts = new TimeSpan(0, 1, 0);
            List<SocketAsyncEventArgs> connectionsToClose = new List<SocketAsyncEventArgs>();
            foreach (SocketAsyncEventArgs s in m_AcceptedConnections)
            {
                token = (SocketAsyncUserToken)s.UserToken;
                if (currentTime - token.LastUpdate > ts)
                {
                    DataServerEventArgs args = new DataServerEventArgs();
                    args.EventType = DataServerEventTypes.ConnectionClosed;
                    args.Details = "The server has forced closed socket " + token.Socket.Handle.ToInt32() + " due to timeout.";
                    OnEventNotify(args);
                    connectionsToClose.Add(s);
                }
            }
            if (connectionsToClose.Count > 0)
            {
                foreach (SocketAsyncEventArgs s in connectionsToClose)
                {
                    CloseClientSocket(s);
                }
            }
            m_maxNumberAcceptedClients.WaitOne();
            bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
            if (!willRaiseEvent)
            {
                ProcessAccept(acceptEventArg);
            }
        }

        // This method is the callback method associated with Socket.AcceptAsync  
        // operations and is invoked when an accept operation is complete 
        // 
        void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
        {
            ProcessAccept(e);
        }

        private void ProcessAccept(SocketAsyncEventArgs e)
        {
            Interlocked.Increment(ref m_numConnectedSockets);
            DataServerEventArgs args = new DataServerEventArgs();
            args.EventType = DataServerEventTypes.ConnectionAccepted;
            args.Details = "The server has accepted a connection on socket " + e.AcceptSocket.Handle.ToInt32() + ".";
            OnEventNotify(args);
            args.EventType = DataServerEventTypes.ConnectionAccepted;
            args.Details = "There are " + m_numConnectedSockets + " clients connected.";
            OnEventNotify(args);

            // Get the socket for the accepted client connection and put it into the  
            //ReadEventArg object user token
            SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
            SocketAsyncUserToken token = (SocketAsyncUserToken)readEventArgs.UserToken;
            token.Socket = e.AcceptSocket;
            token.AcceptedAt = DateTime.Now;
            token.LastUpdate = token.AcceptedAt;
            token.TotalBytes = 0;
            token.Closed = false;
            token.DeviceId = null;
            token.WaitingForControlResponse = false;

            m_AcceptedConnections.Add(readEventArgs);

            // As soon as the client is connected, post a receive to the connection 
            bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
            if (!willRaiseEvent)
            {
                ProcessReceive(readEventArgs);
            }

            // Accept the next connection request
            StartAccept(e);
        }

        // This method is called whenever a receive or send operation is completed on a socket  
        // 
        // <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
        void SendReceive_Completed(object sender, SocketAsyncEventArgs e)
        {
            // determine which type of operation just completed and call the associated handler 
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }

        }

        // This method is invoked when an asynchronous receive operation completes.  
        // If the remote host closed the connection, then the socket is closed.   
        // 
        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            // check if the remote host closed the connection
            SocketAsyncUserToken token = (SocketAsyncUserToken)e.UserToken;
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            {
                //increment the count of the total bytes receive by the server
                Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
                Buffer.BlockCopy(e.Buffer, e.Offset, token.ReceiveBuffer, token.TotalBytes, e.BytesTransferred);

                byte[] raw = new byte[e.BytesTransferred];

                Array.Copy(e.Buffer, e.Offset, raw, 0, e.BytesTransferred);

                string raw_bytes = BitConverter.ToString(raw);
                token.TotalBytes += e.BytesTransferred;
                token.LastUpdate = DateTime.Now;

                DataServerEventArgs args = null;
                
                args = new DataServerEventArgs();
                args.EventType = DataServerEventTypes.DataReceived;
                args.Details = "Raw Bytes:\r\n" + raw_bytes + "\r\nThe server has read a total of " + token.TotalBytes + " bytes on socket " + token.Socket.Handle.ToInt32() + ".";
                OnEventNotify(args);
                
                bool eot = false;
                int index = 0;
                for (int i = e.Offset; i < e.Offset + e.BytesTransferred; i++)
                {
                    if (e.Buffer[i] == 4)
                    {
                        // skip the eot character
                        index++;
                        
                        eot = true;
                        break;
                    }
                    index++;
                }

                // check for End of Transmission (EOT) byte
                if (eot)
                {
                    string messageStr = null;
                    string messageType = null;

                    messageStr = System.Text.Encoding.ASCII.GetString(token.ReceiveBuffer, 0, token.TotalBytes);
                    token.DeviceId = messageStr.Substring(0, 3);
                    messageType = messageStr.Substring(3, 3);

                    int count = 0;

                    // generate response
                    count = Message.Response(token);

                    token.TotalBytes = 0;

                    // send response
                    e.SetBuffer(token.SendBuffer, 0, count);

                    args = new DataServerEventArgs();
                    args.EventType = DataServerEventTypes.DataSent;
                    args.DeviceId = token.DeviceId;
                    args.Details = "Sending response of " + count + " bytes for " + token.DeviceId + " on socket " + token.Socket.Handle.ToInt32() + ".";
                    OnEventNotify(args);
                    bool willRaiseEvent = token.Socket.SendAsync(e);
                    if (!willRaiseEvent)
                    {
                        ProcessSend(e);
                    }             
                }
                else
                {
                    bool willRaiseEvent = token.Socket.ReceiveAsync(e);
                    if (!willRaiseEvent)
                    {
                        ProcessReceive(e);
                    }
                }
            }
            else if (token.Socket.Available > 0)
            {
                // why is the data not being recieved?
                var args = new DataServerEventArgs();
                args.EventType = DataServerEventTypes.Error;
                args.Details = "Data not recieved but is available on socket " + token.Socket.Handle.ToInt32() + ", Transferred: " + e.BytesTransferred + ", Available: " + token.Socket.Available + ", SocketError: " + e.SocketError + ".";
                OnEventNotify(args);
                 
                // this results in stack overflow because ByteTransferred is always 0
                /*
                bool willRaiseEvent = token.Socket.ReceiveAsync(e);
                if (!willRaiseEvent)
                {
                    ProcessReceive(e);
                }
                */
            }
            // SocketError.Success and e.BytesTransferred == 0 means client disconnected
            else
            {
                var args = new DataServerEventArgs();
                args.EventType = DataServerEventTypes.ConnectionClosed;
                args.Details = "Closing socket " + token.Socket.Handle.ToInt32() + ", " + e.SocketError + ".";
                OnEventNotify(args);
                CloseClientSocket(e);
            }
        }

        // This method is invoked when an asynchronous send operation completes.   
        // The method issues another receive on the socket to read any additional  
        // data sent from the client 
        // 
        // <param name="e"></param>
        private void ProcessSend(SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.Success)
            {
                SocketAsyncUserToken token = (SocketAsyncUserToken)e.UserToken;
                // read the next block of data send from the client 
                bool willRaiseEvent = token.Socket.ReceiveAsync(e);
                if (!willRaiseEvent)
                {
                    ProcessReceive(e);
                }
            }
            else
            {
                CloseClientSocket(e);
            }
        }

        private void CloseClientSocket(SocketAsyncEventArgs e)
        {
            SocketAsyncUserToken token = e.UserToken as SocketAsyncUserToken;

            // close the socket associated with the client 
            try
            {
                if (token.Closed)
                {
                    return;
                }
                else
                {
                    token.Closed = true;
                }
                token.Socket.Shutdown(SocketShutdown.Send);
                DataServerEventArgs args1 = new DataServerEventArgs();
                args1.EventType = DataServerEventTypes.ConnectionClosed;
                args1.Details = "The server has shutdown socket " + token.Socket.Handle.ToInt32() + " successfully.";
                OnEventNotify(args1);
            }
            // throws if client process has already closed 
            catch (Exception)
            {
                DataServerEventArgs args2 = new DataServerEventArgs();
                args2.EventType = DataServerEventTypes.ConnectionClosed;
                args2.Details = "The socket " + token.Socket.Handle.ToInt32() + " was already shutdown.";
                OnEventNotify(args2);
                return;
            }
            token.Socket.Close();

            // decrement the counter keeping track of the total number of clients connected to the server
            Interlocked.Decrement(ref m_numConnectedSockets);
            m_maxNumberAcceptedClients.Release();
            DataServerEventArgs args = new DataServerEventArgs();
            args.EventType = DataServerEventTypes.ConnectionClosed;
            args.Details = "There are " + m_numConnectedSockets + " clients connected.";
            OnEventNotify(args);

            m_AcceptedConnections.Remove(e);

            // Free the SocketAsyncEventArg so they can be reused by another client
            m_readWritePool.Push(e);
        }
    }    
}

有没有想过为什么会出现这个问题?只要来回只有一次传输,一切都很好。当同一连接上有第二个交换时,就会产生问题。

编辑:添加了记录输出以演示问题:

Server is starting on port 15027.
The server has accepted a connection on socket 1068.
There are 1 clients connected.
Raw Bytes:
7E-39-39-01-02-31-03-30-35-02-03-02-5B-30-39-2F-31-34-2F-32-31-20-31-36-3A-33-37-3A-32-38-20-33-5D-20-44-65-76-69-63-65-20-73-74-61-72-74-75-70-2E-20-5B-30-5D-03-04
The server has read a total of 55 bytes on socket 1068.
Message of type Notification received from ~99 on socket 1068.
Message handled with response: DRX, 09/14/21 16:37:28 3 for ~99 on socket 1068.
Sending response of 28 bytes for ~99 on socket 1068.
Closing socket 1068, Success.
The server has shutdown socket 1068 successfully.
There are 0 clients connected.
The server has accepted a connection on socket 1092.
There are 1 clients connected.
Raw Bytes:
7E-39-39-03-04
The server has read a total of 5 bytes on socket 1092.
Message of type ControlRequest received from ~99 on socket 1092.
Message handled with response: 1, D1L for ~99 on socket 1092.
Sending response of 7 bytes for ~99 on socket 1092.
Raw Bytes:
44-31-4C-04
The server has read a total of 4 bytes on socket 1092.
Sending response of 0 bytes for ~99 on socket 1092.
Closing socket 1092, Success.
The server has shutdown socket 1092 successfully.
There are 0 clients connected.
The server has accepted a connection on socket 2140.
There are 1 clients connected.
Data not transferred but is available on socket 2140, Transferred: 0, Available: 61, SocketError: Success.
The server has accepted a connection on socket 2244.
There are 2 clients connected.
Raw Bytes:
7E-39-39-03-04
The server has read a total of 5 bytes on socket 2244.
Message of type ControlRequest received from ~99 on socket 2244.
Message handled with response: 1, D1L for ~99 on socket 2244.
Sending response of 7 bytes for ~99 on socket 2244.
Raw Bytes:
44-31-4C-04
The server has read a total of 4 bytes on socket 2244.
Sending response of 0 bytes for ~99 on socket 2244.
Closing socket 2244, Success.
The server has shutdown socket 2244 successfully.
There are 1 clients connected.
The server has accepted a connection on socket 2248.
There are 2 clients connected.
Data not transferred but is available on socket 2248, Transferred: 0, Available: 5, SocketError: Success.
The server has accepted a connection on socket 2256.
There are 3 clients connected.
Raw Bytes:
7E-39-39-02-05-06-02-30-2E-30-30-30-35-35-35-03-1C-02-30-2E-30-30-30-30-30-30-03-13-02-31-2E-30-30-30-30-30-30-03-1F-02-30-2E-30-30-30-30-30-30-03-44-02-2D-31-30-30-2E-30-30-30-03-04
The server has read a total of 61 bytes on socket 2256.
Message of type DataSample received from ~99 on socket 2256.
Message handled with response: DRX, Acknowledged for ~99 on socket 2256.
Sending response of 21 bytes for ~99 on socket 2256.
Closing socket 2256, Success.
The server has shutdown socket 2256 successfully.
There are 2 clients connected.
The server has forced closed socket 2140 due to timeout.
The server has shutdown socket 2140 successfully.
There are 1 clients connected.
The server has accepted a connection on socket 2140.
There are 2 clients connected.
Data not transferred but is available on socket 2140, Transferred: 0, Available: 5, SocketError: Success.

看起来您正在执行零字节接收 - 这在某些情况下很有用,可以检测数据何时可用,而无需为接收保留缓冲区。

int count = 0;
//...
e.SetBuffer(token.SendBuffer, 0, count);

使最后一个参数大于零 - 如果这是发送:确保你发送了你想要发送的内容,如果这是为了接收:它需要是积极的才能实际接收数据(而不仅仅是检测数据的存在)。如果您对连续发送和接收使用相同的参数:请确保在 每个 操作之前适当地设置缓冲区长度。