多线程套接字 - 可能的并发访问

Multithread socket - Possible concurrent access

我可能错了,但在我看来,在这段代码中,一个名为 _buffer 的全局变量被多个线程分配给堆上的一个新对象,所以如果一个线程正在尝试读取数据在上一个函数中写入它之后,在一个函数中从它开始,但同时另一个线程已将此变量 _buffer 分配给堆上的另一个对象,我会得到错误的数据。这是真的发生还是我错了?如果是,我该如何解决?

public class SocketServer
{
    Socket _serverSocket;
    List<Socket> _clientSocket = new List<Socket>();
    byte[] _buffer;

    public SocketServer()
    {
        _serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    }

    public void Bind(int Port)
    {
        Console.WriteLine("Setting up server...");
        _serverSocket.Bind(new IPEndPoint(IPAddress.Any, Port));
    }

    public void Listen(int BackLog)
    {
        _serverSocket.Listen(BackLog);
    }

    public void Accept()
    {
        _serverSocket.BeginAccept(AcceptCallback, null);
    }

    private void AcceptCallback(IAsyncResult AR)
    {
        Socket socket = _serverSocket.EndAccept(AR);
        _clientSocket.Add(socket);
        Console.WriteLine("Client Connected");
        _buffer = new byte[1024];
        socket.BeginReceive(_buffer, 0, _buffer.Length, SocketFlags.None, ReceiveCallback, socket);
        Accept();
    }

    private void ReceiveCallback(IAsyncResult AR)
    {
        Socket socket = AR.AsyncState as Socket;
        int bufferSize = socket.EndReceive(AR);

        string text = Encoding.ASCII.GetString(_buffer, 0, bufferSize);
        Console.WriteLine("Text Received: {0}", text);

        string response = string.Empty;

        if (text.ToLower() != "get time")
            response = $"\"{text}\" is a Invalid Request";
        else
            response = DateTime.Now.ToLongTimeString();

        byte[] data = Encoding.ASCII.GetBytes(response);
        socket.BeginSend(data, 0, data.Length, SocketFlags.None, SendCallback, socket);

        _buffer = new byte[1024];
        socket.BeginReceive(_buffer, 0, _buffer.Length, SocketFlags.None, ReceiveCallback, socket);
    }

    private void SendCallback(IAsyncResult AR)
    {
        (AR.AsyncState as Socket).EndSend(AR);
    }
}

_buffer 不是线程安全的。我会使用像 ConcurrentBag 这样的并发集合,而不是平面字节数组。这将为您保证线程安全。 如果要将 _buffer 保留为数组,则必须使用适当的锁(例如使用 lock 关键字)以确保多个线程不会同时尝试访问 _buffer。 有关 ConcurrentBag 的更多信息:https://msdn.microsoft.com/en-us/library/dd381779(v=vs.110).aspx

有一个data race涉及多个线程时,其中至少有一个是writer

Socket 是线程安全的,但 SocketServer 不是 。在使用它之前,您正在写信给 _buffer。这绝对是多线程场景中的数据竞争。每次访问共享状态都需要一个锁定机制。

如果您在传递之前立即覆盖它,那么使用 _buffer 的字段是没有意义的。如果您需要使用一个缓冲区,请在初始化时分配一次。为了避免改变太多,你可以这样实现:

class SocketServer
{
    class Transaction
    {
        public readonly byte[] Data;
        public readonly Socket Socket;

        public Transaction(byte[] data, Socket socket)
        {
            Data = data;
            Socket = socket;
        }
    }

    private readonly object _syncObj = new object();
    private readonly List<Transaction> _received = new List<Transaction>();
    //...

    //...
    private void AcceptCallback(IAsyncResult AR)
    {
        //...
        byte[] buffer = new byte[1024];
        socket.BeginReceive(
            buffer, 0, buffer.Length, SocketFlags.None,
            ReceiveCallback, new Transaction(buffer, socket));
        //...
    }
    private void ReceiveCallback(IAsyncResult AR)
    {
        Transaction trans = (Transaction)AR.AsyncState;
        Socket socket = trans.Socket;
        int bufferSize = socket.EndReceive(AR);
        lock (_syncObj) {
            _received.Add(trans);
        }
        //...
        byte[] buffer = new byte[1024];
        socket.BeginReceive(
            buffer, 0, buffer.Length, SocketFlags.None, 
            ReceiveCallback, new Transaction(buffer, socket));
    }
    //...

    // Call this to get all the received data. 
    // This will block ReceiveCallback until it completes.
    public byte[] GetReceivedData()
    {
        int totalSize = 0;
        lock (_syncObj) {
            for (int i = 0; i < _received.Length; i++) {
                totalSize += _received[i].Data.Length;
            }

            byte[] totalData = new byte[totalSize];
            int offset = 0;
            for (int i = 0; i < _received.Length; i++) {
                byte[] blockData = _received[i].Data;
                Buffer.BlockCopy(blockData, 0, totalData, offset, blockData.Length);
                offset += blockData.Length;
            }
            _received.Clear();
            return totalData;
        }
    }
}

或者,您可以创建 IList<ArraySegment<byte>> 的线程安全实现并使用适当的 overloads,但这超出了本答案的范围。

顺便提一下,您的命名约定不一致。您对字段使用下划线驼峰式大小写,对参数使用大写和帕斯卡大小写的混合,对局部变量使用驼峰式大小写。使用您想要的任何约定,但请保持一致。我建议遵循 general guidelines.