多线程套接字 - 可能的并发访问
Multithread socket - Possible concurrent access
我可能错了,但在我看来,在这段代码中,一个名为 _buffer
的全局变量被多个线程分配给堆上的一个新对象,所以如果一个线程正在尝试读取数据在上一个函数中写入它之后,在一个函数中从它开始,但同时另一个线程已将此变量 _buffer 分配给堆上的另一个对象,我会得到错误的数据。这是真的发生还是我错了?如果是,我该如何解决?
public class SocketServer
{
Socket _serverSocket;
List<Socket> _clientSocket = new List<Socket>();
byte[] _buffer;
public SocketServer()
{
_serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
}
public void Bind(int Port)
{
Console.WriteLine("Setting up server...");
_serverSocket.Bind(new IPEndPoint(IPAddress.Any, Port));
}
public void Listen(int BackLog)
{
_serverSocket.Listen(BackLog);
}
public void Accept()
{
_serverSocket.BeginAccept(AcceptCallback, null);
}
private void AcceptCallback(IAsyncResult AR)
{
Socket socket = _serverSocket.EndAccept(AR);
_clientSocket.Add(socket);
Console.WriteLine("Client Connected");
_buffer = new byte[1024];
socket.BeginReceive(_buffer, 0, _buffer.Length, SocketFlags.None, ReceiveCallback, socket);
Accept();
}
private void ReceiveCallback(IAsyncResult AR)
{
Socket socket = AR.AsyncState as Socket;
int bufferSize = socket.EndReceive(AR);
string text = Encoding.ASCII.GetString(_buffer, 0, bufferSize);
Console.WriteLine("Text Received: {0}", text);
string response = string.Empty;
if (text.ToLower() != "get time")
response = $"\"{text}\" is a Invalid Request";
else
response = DateTime.Now.ToLongTimeString();
byte[] data = Encoding.ASCII.GetBytes(response);
socket.BeginSend(data, 0, data.Length, SocketFlags.None, SendCallback, socket);
_buffer = new byte[1024];
socket.BeginReceive(_buffer, 0, _buffer.Length, SocketFlags.None, ReceiveCallback, socket);
}
private void SendCallback(IAsyncResult AR)
{
(AR.AsyncState as Socket).EndSend(AR);
}
}
_buffer 不是线程安全的。我会使用像 ConcurrentBag 这样的并发集合,而不是平面字节数组。这将为您保证线程安全。
如果要将 _buffer 保留为数组,则必须使用适当的锁(例如使用 lock 关键字)以确保多个线程不会同时尝试访问 _buffer。
有关 ConcurrentBag 的更多信息:https://msdn.microsoft.com/en-us/library/dd381779(v=vs.110).aspx
有一个data race涉及多个线程时,其中至少有一个是writer
Socket
是线程安全的,但 SocketServer
不是 。在使用它之前,您正在写信给 _buffer
。这绝对是多线程场景中的数据竞争。每次访问共享状态都需要一个锁定机制。
如果您在传递之前立即覆盖它,那么使用 _buffer
的字段是没有意义的。如果您需要使用一个缓冲区,请在初始化时分配一次。为了避免改变太多,你可以这样实现:
class SocketServer
{
class Transaction
{
public readonly byte[] Data;
public readonly Socket Socket;
public Transaction(byte[] data, Socket socket)
{
Data = data;
Socket = socket;
}
}
private readonly object _syncObj = new object();
private readonly List<Transaction> _received = new List<Transaction>();
//...
//...
private void AcceptCallback(IAsyncResult AR)
{
//...
byte[] buffer = new byte[1024];
socket.BeginReceive(
buffer, 0, buffer.Length, SocketFlags.None,
ReceiveCallback, new Transaction(buffer, socket));
//...
}
private void ReceiveCallback(IAsyncResult AR)
{
Transaction trans = (Transaction)AR.AsyncState;
Socket socket = trans.Socket;
int bufferSize = socket.EndReceive(AR);
lock (_syncObj) {
_received.Add(trans);
}
//...
byte[] buffer = new byte[1024];
socket.BeginReceive(
buffer, 0, buffer.Length, SocketFlags.None,
ReceiveCallback, new Transaction(buffer, socket));
}
//...
// Call this to get all the received data.
// This will block ReceiveCallback until it completes.
public byte[] GetReceivedData()
{
int totalSize = 0;
lock (_syncObj) {
for (int i = 0; i < _received.Length; i++) {
totalSize += _received[i].Data.Length;
}
byte[] totalData = new byte[totalSize];
int offset = 0;
for (int i = 0; i < _received.Length; i++) {
byte[] blockData = _received[i].Data;
Buffer.BlockCopy(blockData, 0, totalData, offset, blockData.Length);
offset += blockData.Length;
}
_received.Clear();
return totalData;
}
}
}
或者,您可以创建 IList<ArraySegment<byte>>
的线程安全实现并使用适当的 overloads,但这超出了本答案的范围。
顺便提一下,您的命名约定不一致。您对字段使用下划线驼峰式大小写,对参数使用大写和帕斯卡大小写的混合,对局部变量使用驼峰式大小写。使用您想要的任何约定,但请保持一致。我建议遵循 general guidelines.
我可能错了,但在我看来,在这段代码中,一个名为 _buffer
的全局变量被多个线程分配给堆上的一个新对象,所以如果一个线程正在尝试读取数据在上一个函数中写入它之后,在一个函数中从它开始,但同时另一个线程已将此变量 _buffer 分配给堆上的另一个对象,我会得到错误的数据。这是真的发生还是我错了?如果是,我该如何解决?
public class SocketServer
{
Socket _serverSocket;
List<Socket> _clientSocket = new List<Socket>();
byte[] _buffer;
public SocketServer()
{
_serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
}
public void Bind(int Port)
{
Console.WriteLine("Setting up server...");
_serverSocket.Bind(new IPEndPoint(IPAddress.Any, Port));
}
public void Listen(int BackLog)
{
_serverSocket.Listen(BackLog);
}
public void Accept()
{
_serverSocket.BeginAccept(AcceptCallback, null);
}
private void AcceptCallback(IAsyncResult AR)
{
Socket socket = _serverSocket.EndAccept(AR);
_clientSocket.Add(socket);
Console.WriteLine("Client Connected");
_buffer = new byte[1024];
socket.BeginReceive(_buffer, 0, _buffer.Length, SocketFlags.None, ReceiveCallback, socket);
Accept();
}
private void ReceiveCallback(IAsyncResult AR)
{
Socket socket = AR.AsyncState as Socket;
int bufferSize = socket.EndReceive(AR);
string text = Encoding.ASCII.GetString(_buffer, 0, bufferSize);
Console.WriteLine("Text Received: {0}", text);
string response = string.Empty;
if (text.ToLower() != "get time")
response = $"\"{text}\" is a Invalid Request";
else
response = DateTime.Now.ToLongTimeString();
byte[] data = Encoding.ASCII.GetBytes(response);
socket.BeginSend(data, 0, data.Length, SocketFlags.None, SendCallback, socket);
_buffer = new byte[1024];
socket.BeginReceive(_buffer, 0, _buffer.Length, SocketFlags.None, ReceiveCallback, socket);
}
private void SendCallback(IAsyncResult AR)
{
(AR.AsyncState as Socket).EndSend(AR);
}
}
_buffer 不是线程安全的。我会使用像 ConcurrentBag 这样的并发集合,而不是平面字节数组。这将为您保证线程安全。 如果要将 _buffer 保留为数组,则必须使用适当的锁(例如使用 lock 关键字)以确保多个线程不会同时尝试访问 _buffer。 有关 ConcurrentBag 的更多信息:https://msdn.microsoft.com/en-us/library/dd381779(v=vs.110).aspx
有一个data race涉及多个线程时,其中至少有一个是writer
Socket
是线程安全的,但 SocketServer
不是 。在使用它之前,您正在写信给 _buffer
。这绝对是多线程场景中的数据竞争。每次访问共享状态都需要一个锁定机制。
如果您在传递之前立即覆盖它,那么使用 _buffer
的字段是没有意义的。如果您需要使用一个缓冲区,请在初始化时分配一次。为了避免改变太多,你可以这样实现:
class SocketServer
{
class Transaction
{
public readonly byte[] Data;
public readonly Socket Socket;
public Transaction(byte[] data, Socket socket)
{
Data = data;
Socket = socket;
}
}
private readonly object _syncObj = new object();
private readonly List<Transaction> _received = new List<Transaction>();
//...
//...
private void AcceptCallback(IAsyncResult AR)
{
//...
byte[] buffer = new byte[1024];
socket.BeginReceive(
buffer, 0, buffer.Length, SocketFlags.None,
ReceiveCallback, new Transaction(buffer, socket));
//...
}
private void ReceiveCallback(IAsyncResult AR)
{
Transaction trans = (Transaction)AR.AsyncState;
Socket socket = trans.Socket;
int bufferSize = socket.EndReceive(AR);
lock (_syncObj) {
_received.Add(trans);
}
//...
byte[] buffer = new byte[1024];
socket.BeginReceive(
buffer, 0, buffer.Length, SocketFlags.None,
ReceiveCallback, new Transaction(buffer, socket));
}
//...
// Call this to get all the received data.
// This will block ReceiveCallback until it completes.
public byte[] GetReceivedData()
{
int totalSize = 0;
lock (_syncObj) {
for (int i = 0; i < _received.Length; i++) {
totalSize += _received[i].Data.Length;
}
byte[] totalData = new byte[totalSize];
int offset = 0;
for (int i = 0; i < _received.Length; i++) {
byte[] blockData = _received[i].Data;
Buffer.BlockCopy(blockData, 0, totalData, offset, blockData.Length);
offset += blockData.Length;
}
_received.Clear();
return totalData;
}
}
}
或者,您可以创建 IList<ArraySegment<byte>>
的线程安全实现并使用适当的 overloads,但这超出了本答案的范围。
顺便提一下,您的命名约定不一致。您对字段使用下划线驼峰式大小写,对参数使用大写和帕斯卡大小写的混合,对局部变量使用驼峰式大小写。使用您想要的任何约定,但请保持一致。我建议遵循 general guidelines.