使用多线程通过 NetworkStream 发送数据
Sending data over NetworkStream using multiple threads
我正在尝试构建一个命令行聊天室,其中服务器正在处理连接并将一个客户端的输入重复回所有其他客户端。
目前,服务器能够接收来自多个客户端的输入,但只能将信息单独发送回这些客户端。我认为我的问题是每个连接都在一个单独的线程上处理。我将如何允许线程相互通信或能够向每个线程发送数据?
服务器代码:
namespace ConsoleApplication
{
class TcpHelper
{
private static object _lock = new object();
private static List<Task> _connections = new List<Task>();
private static TcpListener listener { get; set; }
private static bool accept { get; set; } = false;
private static Task StartListener()
{
return Task.Run(async () =>
{
IPAddress address = IPAddress.Parse("127.0.0.1");
int port = 5678;
listener = new TcpListener(address, port);
listener.Start();
Console.WriteLine($"Server started. Listening to TCP clients at 127.0.0.1:{port}");
while (true)
{
var tcpClient = await listener.AcceptTcpClientAsync();
Console.WriteLine("Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
if (task.IsFaulted)
task.Wait();
}
});
}
// Register and handle the connection
private static async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
// start the new connection task
var connectionTask = HandleConnectionAsync(tcpClient);
// add it to the list of pending task
lock (_lock)
_connections.Add(connectionTask);
// catch all errors of HandleConnectionAsync
try
{
await connectionTask;
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
// remove pending task
lock (_lock)
_connections.Remove(connectionTask);
}
}
private static async Task HandleConnectionAsync(TcpClient client)
{
await Task.Yield();
{
using (var networkStream = client.GetStream())
{
if (client != null)
{
Console.WriteLine("Client connected. Waiting for data.");
StreamReader streamreader = new StreamReader(networkStream);
StreamWriter streamwriter = new StreamWriter(networkStream);
string clientmessage = "";
string servermessage = "";
while (clientmessage != null && clientmessage != "quit")
{
clientmessage = await streamreader.ReadLineAsync();
Console.WriteLine(clientmessage);
servermessage = clientmessage;
streamwriter.WriteLine(servermessage);
streamwriter.Flush();
}
Console.WriteLine("Closing connection.");
networkStream.Dispose();
}
}
}
}
public static void Main(string[] args)
{
// Start the server
Console.WriteLine("Hit Ctrl-C to close the chat server");
TcpHelper.StartListener().Wait();
}
}
}
客户代码:
namespace Client2
{
public class Program
{
private static void clientConnect()
{
TcpClient socketForServer = new TcpClient();
bool status = true;
string userName;
Console.Write("Input Username: ");
userName = Console.ReadLine();
try
{
IPAddress address = IPAddress.Parse("127.0.0.1");
socketForServer.ConnectAsync(address, 5678);
Console.WriteLine("Connected to Server");
}
catch
{
Console.WriteLine("Failed to Connect to server{0}:999", "localhost");
return;
}
NetworkStream networkStream = socketForServer.GetStream();
StreamReader streamreader = new StreamReader(networkStream);
StreamWriter streamwriter = new StreamWriter(networkStream);
try
{
string clientmessage = "";
string servermessage = "";
while (status)
{
Console.Write(userName + ": ");
clientmessage = Console.ReadLine();
if ((clientmessage == "quit") || (clientmessage == "QUIT"))
{
status = false;
streamwriter.WriteLine("quit");
streamwriter.WriteLine(userName + " has left the conversation");
streamwriter.Flush();
}
if ((clientmessage != "quit") && (clientmessage != "quit"))
{
streamwriter.WriteLine(userName + ": " + clientmessage);
streamwriter.Flush();
servermessage = streamreader.ReadLine();
Console.WriteLine("Server:" + servermessage);
}
}
}
catch
{
Console.WriteLine("Exception reading from the server");
}
streamreader.Dispose();
networkStream.Dispose();
streamwriter.Dispose();
}
public static void Main(string[] args)
{
clientConnect();
}
}
}
您的代码中的主要错误是您没有尝试将从一个客户端接收到的数据发送到其他连接的客户端。您的服务器中有 _connections
列表,但列表中存储的唯一内容是连接的 Task
对象,您甚至不对这些对象执行任何操作。
相反,您应该自己维护一个连接列表,这样当您收到来自一个客户端的消息时,您就可以将该消息重新传输给其他客户端。
至少,这应该是 List<TcpClient>
,但是因为您使用的是 StreamReader
和 StreamWriter
,所以您需要将这些对象初始化并存储在列表中出色地。此外,您应该包括一个客户端标识符。一个明显的选择是客户端的名称(即用户输入的名称),但是您的示例没有在聊天协议中提供任何机制来传输该标识作为连接初始化的一部分,所以在我的示例(下)我只使用一个简单的整数值。
您发布的代码中还有其他一些违规行为,例如:
- 在一个全新的线程中启动一个任务,只是为了执行一些语句,让您启动一个异步操作。在我的示例中,我只是省略了代码的
Task.Run()
部分,因为它不需要。
- 在为
IsFaulted
返回时检查特定于连接的任务。由于在返回此 Task
对象时,任何 I/O 都不太可能实际发生,因此此逻辑几乎没有用处。对 Wait()
的调用将抛出异常,该异常将传播到主线程的 Wait()
调用,从而终止服务器。但是您不会在发生任何其他错误时终止服务器,因此不清楚您为什么要在此处执行此操作。
- 有一个对
Task.Yield()
的虚假调用。我不知道你想在那里完成什么,但不管它是什么,该声明都没有用。我只是删除了它。
- 在您的客户端代码中,您仅在发送数据后尝试从服务器接收数据。这是非常错误的;您希望客户响应并在数据发送给他们后立即接收数据。在我的版本中,我包含了一个简单的小匿名方法,它会立即被调用以启动一个单独的消息接收循环,该循环将与主用户输入循环异步并发执行。
- 同样在客户端代码中,您在 之后 发送了 "quit" 消息,这将导致服务器关闭连接。这意味着服务器实际上永远不会收到“……已经离开……”消息。我颠倒了消息的顺序,因此 "quit" 始终是客户端发送的最后一件事。
我的版本是这样的:
服务器:
class TcpHelper
{
class ClientData : IDisposable
{
private static int _nextId;
public int ID { get; private set; }
public TcpClient Client { get; private set; }
public TextReader Reader { get; private set; }
public TextWriter Writer { get; private set; }
public ClientData(TcpClient client)
{
ID = _nextId++;
Client = client;
NetworkStream stream = client.GetStream();
Reader = new StreamReader(stream);
Writer = new StreamWriter(stream);
}
public void Dispose()
{
Writer.Close();
Reader.Close();
Client.Close();
}
}
private static readonly object _lock = new object();
private static readonly List<ClientData> _connections = new List<ClientData>();
private static TcpListener listener { get; set; }
private static bool accept { get; set; }
public static async Task StartListener()
{
IPAddress address = IPAddress.Any;
int port = 5678;
listener = new TcpListener(address, port);
listener.Start();
Console.WriteLine("Server started. Listening to TCP clients on port {0}", port);
while (true)
{
var tcpClient = await listener.AcceptTcpClientAsync();
Console.WriteLine("Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
if (task.IsFaulted)
task.Wait();
}
}
// Register and handle the connection
private static async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
ClientData clientData = new ClientData(tcpClient);
lock (_lock) _connections.Add(clientData);
// catch all errors of HandleConnectionAsync
try
{
await HandleConnectionAsync(clientData);
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
lock (_lock) _connections.Remove(clientData);
clientData.Dispose();
}
}
private static async Task HandleConnectionAsync(ClientData clientData)
{
Console.WriteLine("Client connected. Waiting for data.");
string clientmessage;
while ((clientmessage = await clientData.Reader.ReadLineAsync()) != null && clientmessage != "quit")
{
string message = "From " + clientData.ID + ": " + clientmessage;
Console.WriteLine(message);
lock (_lock)
{
// Locking the entire operation ensures that a) none of the client objects
// are disposed before we can write to them, and b) all of the chat messages
// are received in the same order by all clients.
foreach (ClientData recipient in _connections.Where(r => r.ID != clientData.ID))
{
recipient.Writer.WriteLine(message);
recipient.Writer.Flush();
}
}
}
Console.WriteLine("Closing connection.");
}
}
客户:
class Program
{
private const int _kport = 5678;
private static async Task clientConnect()
{
IPAddress address = IPAddress.Loopback;
TcpClient socketForServer = new TcpClient();
string userName;
Console.Write("Input Username: ");
userName = Console.ReadLine();
try
{
await socketForServer.ConnectAsync(address, _kport);
Console.WriteLine("Connected to Server");
}
catch (Exception e)
{
Console.WriteLine("Failed to Connect to server {0}:{1}", address, _kport);
return;
}
using (NetworkStream networkStream = socketForServer.GetStream())
{
var readTask = ((Func<Task>)(async () =>
{
using (StreamReader reader = new StreamReader(networkStream))
{
string receivedText;
while ((receivedText = await reader.ReadLineAsync()) != null)
{
Console.WriteLine("Server:" + receivedText);
}
}
}))();
using (StreamWriter streamwriter = new StreamWriter(networkStream))
{
try
{
while (true)
{
Console.Write(userName + ": ");
string clientmessage = Console.ReadLine();
if ((clientmessage == "quit") || (clientmessage == "QUIT"))
{
streamwriter.WriteLine(userName + " has left the conversation");
streamwriter.WriteLine("quit");
streamwriter.Flush();
break;
}
else
{
streamwriter.WriteLine(userName + ": " + clientmessage);
streamwriter.Flush();
}
}
await readTask;
}
catch (Exception e)
{
Console.WriteLine("Exception writing to server: " + e);
throw;
}
}
}
}
public static void Main(string[] args)
{
clientConnect().Wait();
}
}
您还有很多工作要做。您可能希望在服务器端正确初始化聊天用户名。至少,对于真实世界的代码,您需要进行更多的错误检查,并确保可靠地生成客户端 ID(如果您只想要正 ID 值,则不能超过 2^31-1在它回滚到 0
) 之前连接。
我还做了一些并非绝对必要的其他小改动,例如使用 IPAddress.Any
和 IPAddress.Loopback
值而不是解析字符串,并且只是总体上简化和清理了此处的代码在那里。另外,我目前没有使用 C# 6 编译器,所以我更改了您使用 C# 6 功能的代码,以便它可以使用 C# 5 编译。
要构建一个成熟的聊天服务器,您还有很多工作要做。但我希望以上内容能让你回到正确的轨道上。
我正在尝试构建一个命令行聊天室,其中服务器正在处理连接并将一个客户端的输入重复回所有其他客户端。 目前,服务器能够接收来自多个客户端的输入,但只能将信息单独发送回这些客户端。我认为我的问题是每个连接都在一个单独的线程上处理。我将如何允许线程相互通信或能够向每个线程发送数据?
服务器代码:
namespace ConsoleApplication
{
class TcpHelper
{
private static object _lock = new object();
private static List<Task> _connections = new List<Task>();
private static TcpListener listener { get; set; }
private static bool accept { get; set; } = false;
private static Task StartListener()
{
return Task.Run(async () =>
{
IPAddress address = IPAddress.Parse("127.0.0.1");
int port = 5678;
listener = new TcpListener(address, port);
listener.Start();
Console.WriteLine($"Server started. Listening to TCP clients at 127.0.0.1:{port}");
while (true)
{
var tcpClient = await listener.AcceptTcpClientAsync();
Console.WriteLine("Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
if (task.IsFaulted)
task.Wait();
}
});
}
// Register and handle the connection
private static async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
// start the new connection task
var connectionTask = HandleConnectionAsync(tcpClient);
// add it to the list of pending task
lock (_lock)
_connections.Add(connectionTask);
// catch all errors of HandleConnectionAsync
try
{
await connectionTask;
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
// remove pending task
lock (_lock)
_connections.Remove(connectionTask);
}
}
private static async Task HandleConnectionAsync(TcpClient client)
{
await Task.Yield();
{
using (var networkStream = client.GetStream())
{
if (client != null)
{
Console.WriteLine("Client connected. Waiting for data.");
StreamReader streamreader = new StreamReader(networkStream);
StreamWriter streamwriter = new StreamWriter(networkStream);
string clientmessage = "";
string servermessage = "";
while (clientmessage != null && clientmessage != "quit")
{
clientmessage = await streamreader.ReadLineAsync();
Console.WriteLine(clientmessage);
servermessage = clientmessage;
streamwriter.WriteLine(servermessage);
streamwriter.Flush();
}
Console.WriteLine("Closing connection.");
networkStream.Dispose();
}
}
}
}
public static void Main(string[] args)
{
// Start the server
Console.WriteLine("Hit Ctrl-C to close the chat server");
TcpHelper.StartListener().Wait();
}
}
}
客户代码:
namespace Client2
{
public class Program
{
private static void clientConnect()
{
TcpClient socketForServer = new TcpClient();
bool status = true;
string userName;
Console.Write("Input Username: ");
userName = Console.ReadLine();
try
{
IPAddress address = IPAddress.Parse("127.0.0.1");
socketForServer.ConnectAsync(address, 5678);
Console.WriteLine("Connected to Server");
}
catch
{
Console.WriteLine("Failed to Connect to server{0}:999", "localhost");
return;
}
NetworkStream networkStream = socketForServer.GetStream();
StreamReader streamreader = new StreamReader(networkStream);
StreamWriter streamwriter = new StreamWriter(networkStream);
try
{
string clientmessage = "";
string servermessage = "";
while (status)
{
Console.Write(userName + ": ");
clientmessage = Console.ReadLine();
if ((clientmessage == "quit") || (clientmessage == "QUIT"))
{
status = false;
streamwriter.WriteLine("quit");
streamwriter.WriteLine(userName + " has left the conversation");
streamwriter.Flush();
}
if ((clientmessage != "quit") && (clientmessage != "quit"))
{
streamwriter.WriteLine(userName + ": " + clientmessage);
streamwriter.Flush();
servermessage = streamreader.ReadLine();
Console.WriteLine("Server:" + servermessage);
}
}
}
catch
{
Console.WriteLine("Exception reading from the server");
}
streamreader.Dispose();
networkStream.Dispose();
streamwriter.Dispose();
}
public static void Main(string[] args)
{
clientConnect();
}
}
}
您的代码中的主要错误是您没有尝试将从一个客户端接收到的数据发送到其他连接的客户端。您的服务器中有 _connections
列表,但列表中存储的唯一内容是连接的 Task
对象,您甚至不对这些对象执行任何操作。
相反,您应该自己维护一个连接列表,这样当您收到来自一个客户端的消息时,您就可以将该消息重新传输给其他客户端。
至少,这应该是 List<TcpClient>
,但是因为您使用的是 StreamReader
和 StreamWriter
,所以您需要将这些对象初始化并存储在列表中出色地。此外,您应该包括一个客户端标识符。一个明显的选择是客户端的名称(即用户输入的名称),但是您的示例没有在聊天协议中提供任何机制来传输该标识作为连接初始化的一部分,所以在我的示例(下)我只使用一个简单的整数值。
您发布的代码中还有其他一些违规行为,例如:
- 在一个全新的线程中启动一个任务,只是为了执行一些语句,让您启动一个异步操作。在我的示例中,我只是省略了代码的
Task.Run()
部分,因为它不需要。 - 在为
IsFaulted
返回时检查特定于连接的任务。由于在返回此Task
对象时,任何 I/O 都不太可能实际发生,因此此逻辑几乎没有用处。对Wait()
的调用将抛出异常,该异常将传播到主线程的Wait()
调用,从而终止服务器。但是您不会在发生任何其他错误时终止服务器,因此不清楚您为什么要在此处执行此操作。 - 有一个对
Task.Yield()
的虚假调用。我不知道你想在那里完成什么,但不管它是什么,该声明都没有用。我只是删除了它。 - 在您的客户端代码中,您仅在发送数据后尝试从服务器接收数据。这是非常错误的;您希望客户响应并在数据发送给他们后立即接收数据。在我的版本中,我包含了一个简单的小匿名方法,它会立即被调用以启动一个单独的消息接收循环,该循环将与主用户输入循环异步并发执行。
- 同样在客户端代码中,您在 之后 发送了 "quit" 消息,这将导致服务器关闭连接。这意味着服务器实际上永远不会收到“……已经离开……”消息。我颠倒了消息的顺序,因此 "quit" 始终是客户端发送的最后一件事。
我的版本是这样的:
服务器:
class TcpHelper
{
class ClientData : IDisposable
{
private static int _nextId;
public int ID { get; private set; }
public TcpClient Client { get; private set; }
public TextReader Reader { get; private set; }
public TextWriter Writer { get; private set; }
public ClientData(TcpClient client)
{
ID = _nextId++;
Client = client;
NetworkStream stream = client.GetStream();
Reader = new StreamReader(stream);
Writer = new StreamWriter(stream);
}
public void Dispose()
{
Writer.Close();
Reader.Close();
Client.Close();
}
}
private static readonly object _lock = new object();
private static readonly List<ClientData> _connections = new List<ClientData>();
private static TcpListener listener { get; set; }
private static bool accept { get; set; }
public static async Task StartListener()
{
IPAddress address = IPAddress.Any;
int port = 5678;
listener = new TcpListener(address, port);
listener.Start();
Console.WriteLine("Server started. Listening to TCP clients on port {0}", port);
while (true)
{
var tcpClient = await listener.AcceptTcpClientAsync();
Console.WriteLine("Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
if (task.IsFaulted)
task.Wait();
}
}
// Register and handle the connection
private static async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
ClientData clientData = new ClientData(tcpClient);
lock (_lock) _connections.Add(clientData);
// catch all errors of HandleConnectionAsync
try
{
await HandleConnectionAsync(clientData);
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
lock (_lock) _connections.Remove(clientData);
clientData.Dispose();
}
}
private static async Task HandleConnectionAsync(ClientData clientData)
{
Console.WriteLine("Client connected. Waiting for data.");
string clientmessage;
while ((clientmessage = await clientData.Reader.ReadLineAsync()) != null && clientmessage != "quit")
{
string message = "From " + clientData.ID + ": " + clientmessage;
Console.WriteLine(message);
lock (_lock)
{
// Locking the entire operation ensures that a) none of the client objects
// are disposed before we can write to them, and b) all of the chat messages
// are received in the same order by all clients.
foreach (ClientData recipient in _connections.Where(r => r.ID != clientData.ID))
{
recipient.Writer.WriteLine(message);
recipient.Writer.Flush();
}
}
}
Console.WriteLine("Closing connection.");
}
}
客户:
class Program
{
private const int _kport = 5678;
private static async Task clientConnect()
{
IPAddress address = IPAddress.Loopback;
TcpClient socketForServer = new TcpClient();
string userName;
Console.Write("Input Username: ");
userName = Console.ReadLine();
try
{
await socketForServer.ConnectAsync(address, _kport);
Console.WriteLine("Connected to Server");
}
catch (Exception e)
{
Console.WriteLine("Failed to Connect to server {0}:{1}", address, _kport);
return;
}
using (NetworkStream networkStream = socketForServer.GetStream())
{
var readTask = ((Func<Task>)(async () =>
{
using (StreamReader reader = new StreamReader(networkStream))
{
string receivedText;
while ((receivedText = await reader.ReadLineAsync()) != null)
{
Console.WriteLine("Server:" + receivedText);
}
}
}))();
using (StreamWriter streamwriter = new StreamWriter(networkStream))
{
try
{
while (true)
{
Console.Write(userName + ": ");
string clientmessage = Console.ReadLine();
if ((clientmessage == "quit") || (clientmessage == "QUIT"))
{
streamwriter.WriteLine(userName + " has left the conversation");
streamwriter.WriteLine("quit");
streamwriter.Flush();
break;
}
else
{
streamwriter.WriteLine(userName + ": " + clientmessage);
streamwriter.Flush();
}
}
await readTask;
}
catch (Exception e)
{
Console.WriteLine("Exception writing to server: " + e);
throw;
}
}
}
}
public static void Main(string[] args)
{
clientConnect().Wait();
}
}
您还有很多工作要做。您可能希望在服务器端正确初始化聊天用户名。至少,对于真实世界的代码,您需要进行更多的错误检查,并确保可靠地生成客户端 ID(如果您只想要正 ID 值,则不能超过 2^31-1在它回滚到 0
) 之前连接。
我还做了一些并非绝对必要的其他小改动,例如使用 IPAddress.Any
和 IPAddress.Loopback
值而不是解析字符串,并且只是总体上简化和清理了此处的代码在那里。另外,我目前没有使用 C# 6 编译器,所以我更改了您使用 C# 6 功能的代码,以便它可以使用 C# 5 编译。
要构建一个成熟的聊天服务器,您还有很多工作要做。但我希望以上内容能让你回到正确的轨道上。