Windows 10 个插槽和 MS PPL。服务器和客户端之间的 TCP 连接在几秒后断开
Windows 10 Sockets and MS PPL. TCP connection between server and client breaks after few seconds
我的系统:
服务器:
Windows 10 专业版 1511 10586.36
Microsoft Visual Studio 2015 社区 14.0.24720.00 更新 1
客户:
Windows 10 个 IoT 核心(内部版本 10586)Raspberry Pi 2.
我的错误:
连接建立成功,但在正常工作几秒后就断开了。所以数据交换确实有效,但是在Release版本中只有几秒钟,而在Debug版本中大约需要40-60秒。
我总是得到:
t.get(); // See code below. Cancelled status.
SocketErrorStatus errorStatus = SocketError::GetStatus(e->HResult);//See code below. errorStatus == 0 (which means Unknown error)
我有时会在客户端出现这些错误,有时会在服务器端出现。
我阅读了 MS 示例 - https://code.msdn.microsoft.com/windowsapps/StreamSocket-Sample-8c573931。
我还发现了多种使用 SocketStream 对象实现异步 read/write 操作的方法 - https://github.com/Microsoft/Windows-universal-samples/tree/master/Samples/DataReaderWriter.
但是我无法获得稳定的工作。
下面是代码。
服务器端:
void NetworkHandler::Connect() {
listener->Control->KeepAlive = true;
listener->Control->NoDelay = true;
listener->Control->QualityOfService =
Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
String^ localHostAddr = ref new String(L"192.168.0.10");
HostName^ localHost = ref new HostName(localHostAddr);
String^ localPort = ref new String(L"3001");
create_task(listener->BindEndpointAsync(localHost,
localPort)).then([this, localHost](task<void> previousTask) {
try {
// Try getting an exception.
previousTask.get();
String^ message = "Listening on address " +
localHost->CanonicalName;
}
catch (Platform::Exception^ exception) {
String^ message = "Start listening failed with error: " +
exception->Message;
}
});
listener->ConnectionReceived += ref new
TypedEventHandler<StreamSocketListener^,
StreamSocketListenerConnectionReceivedEventArgs^>(this,
&NetworkHandler::doAccept);
}
void NetworkHandler::doAccept(StreamSocketListener^ listener,
StreamSocketListenerConnectionReceivedEventArgs^ object) {
socket = object->Socket;
reader = ref new DataReader(socket->InputStream);
reader->InputStreamOptions = InputStreamOptions::Partial;
writer = ref new DataWriter(socket->OutputStream);
ifConnected = true;
doRead();
doWrite();
}
void NetworkHandler::doRead() {
task<UINT32>(reader->LoadAsync(sizeof(UINT32))).then([this](UINT32
size) {
if (size < sizeof(UINT32)) {
// The underlying socket was closed before we were able to read
// the whole data.
cancel_current_task();
}
UINT32 dataLength = reader->ReadUInt32();
return task<UINT32>(reader->LoadAsync(dataLength)).then([this,
dataLength](UINT32 actualDataLength) {
if (actualDataLength != dataLength) {
// The underlying socket was closed before we were able to
// read the whole data.
cancel_current_task();
}
});
}).then([this](task<void> t) {
try {
// Try getting all exceptions from the continuation chain above
// this point.
t.get();
//read data here
timer = reader->ReadDouble(); //timer
altitudeASL = reader->ReadDouble(); //altitudeASL
roll = reader->ReadDouble(); //roll
pitch = reader->ReadDouble(); //pitch
yaw = reader->ReadDouble(); //yaw
vCas = reader->ReadDouble(); //vcas
Eng0Rpm = reader->ReadDouble();
Eng1Rpm = reader->ReadDouble();
Eng2Rpm = reader->ReadDouble();
Eng3Rpm = reader->ReadDouble();
doRead(); //call doRead() again after previous call
// successfully ended.
}
catch (Platform::Exception^ e) {
// Explicitly close the socket.
SocketErrorStatus errorStatus =
SocketError::GetStatus(e->HResult);
if (errorStatus != SocketErrorStatus::Unknown) {
switch (errorStatus) {
case SocketErrorStatus::HostNotFound: {
// If hostname from user, this may indicate bad input
// set a flag to ask user to re-enter hostname
break;
}
case SocketErrorStatus::ConnectionRefused: {
// The server might be temporarily busy
break;
}
case SocketErrorStatus::NetworkIsUnreachable: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::UnreachableHost: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::NetworkIsDown: {
// Could be a connectivity issue
break;
}
default: {
// Connection failed and no options are available
// Try to use cached data if available
// may want to tell user that connect failed
break;
}
}
}
else {
// got an Hresult that is not mapped to an enum
// Could be a connectivity issue
}
ifConnected = false;
delete socket;
}
catch (task_canceled&) {
// Do not print anything here - this will usually happen
// because user closed the client socket.
// Explicitly close the socket.
ifConnected = false;
delete socket;
}
});
}
void NetworkHandler::doWrite() {
writer->WriteUInt32(4 * sizeof(DOUBLE)); //data size in bytes;
writer->WriteDouble(aileronCmd); // 1-st double, aileron;
writer->WriteDouble(elevatorCmd); //2-nd double, elevator;
writer->WriteDouble(rudderCmd); //3-rd double, rudder;
writer->WriteDouble(throttleCmd); //4-th double, throttle;
UINT32 totalMessageSize = sizeof(UINT32) + 4 * sizeof(DOUBLE);
//total message size
task<UINT32>(writer->StoreAsync()).then([this, totalMessageSize]
(UINT32 writtenBytes) {
if (writtenBytes != totalMessageSize)
cancel_current_task();
}).then([this](task<void> t) {
try {
// Try getting all exceptions from the continuation chain above
this point.
t.get();
doWrite(); //call doWrite() again after previous call
successfully ended.
}
catch (Platform::Exception^ e) {
// Explicitly close the socket.
SocketErrorStatus errorStatus =
SocketError::GetStatus(e->HResult);
if (errorStatus != SocketErrorStatus::Unknown) {
switch (errorStatus) {
case SocketErrorStatus::HostNotFound: {
// If hostname from user, this may indicate bad input
// set a flag to ask user to re-enter hostname
break;
}
case SocketErrorStatus::ConnectionRefused: {
// The server might be temporarily busy
break;
}
case SocketErrorStatus::NetworkIsUnreachable: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::UnreachableHost: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::NetworkIsDown: {
// Could be a connectivity issue
break;
}
default: {
// Connection failed and no options are available
// Try to use cached data if available
// may want to tell user that connect failed
break;
}
}
}
else {
// got an Hresult that is not mapped to an enum
// Could be a connectivity issue
}
ifConnected = false;
delete socket;
}
catch (task_canceled&) {
// Do not print anything here - this will usually happen
// because user closed the client socket.
// Explicitly close the socket.
ifConnected = false;
delete socket;
}
});
}
客户端:
void SocketBoard::Connect() {
socket = ref new StreamSocket();
socket->Control->KeepAlive = true;
socket->Control->NoDelay = true;
socket->Control->QualityOfService =
Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
reader = ref new DataReader(socket->InputStream);
reader->InputStreamOptions = InputStreamOptions::Partial;
writer = ref new DataWriter(socket->OutputStream);
String^ remoteHostAddr = ref new String(L"192.168.0.10");
HostName^ remoteHost = ref new HostName(remoteHostAddr);
String^ remotePort = ref new String(L"3001");
create_task(socket->ConnectAsync(remoteHost, remotePort)).get();
ifConnected = true;
TimeSpan period;
period.Duration = 1 * 10000000; // 10,000,000 ticks per second
ThreadPoolTimer^ PeriodicTimer =
ThreadPoolTimer::CreatePeriodicTimer(ref new
TimerElapsedHandler([this](ThreadPoolTimer^ source) {
timer_sec--;
if (timer_sec <= 0)
source->Cancel();
}), period, ref new TimerDestroyedHandler([&](ThreadPoolTimer^ source)
{}));
doRead();
doWrite();
}
SocketBoard::doRead() { //same as Server }
SocketBoard::doWrite() { //same as Server }
我觉得有些东西(内部缓冲区,或者线程池)是有开销的,这导致了异常。
谁能解释一下?
我删除了
reader->InputStreamOptions = InputStreamOptions::Partial;
现在 reading/writing 操作正常。复制的时候出错了
if (actualDataLength != dataLength) {
// The underlying socket was closed before we were able to
// read the whole data.
cancel_current_task();
}
来自 MS Samples(在上面的问题中提到)。因此,当 read 是 PARTIAL 时,调用 cancel_current_task() 并因此关闭套接字,这实际上迫使相反的点 (client/server) 抛出。
我的系统:
服务器: Windows 10 专业版 1511 10586.36 Microsoft Visual Studio 2015 社区 14.0.24720.00 更新 1
客户: Windows 10 个 IoT 核心(内部版本 10586)Raspberry Pi 2.
我的错误:
连接建立成功,但在正常工作几秒后就断开了。所以数据交换确实有效,但是在Release版本中只有几秒钟,而在Debug版本中大约需要40-60秒。
我总是得到:
t.get(); // See code below. Cancelled status.
SocketErrorStatus errorStatus = SocketError::GetStatus(e->HResult);//See code below. errorStatus == 0 (which means Unknown error)
我有时会在客户端出现这些错误,有时会在服务器端出现。
我阅读了 MS 示例 - https://code.msdn.microsoft.com/windowsapps/StreamSocket-Sample-8c573931。 我还发现了多种使用 SocketStream 对象实现异步 read/write 操作的方法 - https://github.com/Microsoft/Windows-universal-samples/tree/master/Samples/DataReaderWriter.
但是我无法获得稳定的工作。 下面是代码。
服务器端:
void NetworkHandler::Connect() {
listener->Control->KeepAlive = true;
listener->Control->NoDelay = true;
listener->Control->QualityOfService =
Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
String^ localHostAddr = ref new String(L"192.168.0.10");
HostName^ localHost = ref new HostName(localHostAddr);
String^ localPort = ref new String(L"3001");
create_task(listener->BindEndpointAsync(localHost,
localPort)).then([this, localHost](task<void> previousTask) {
try {
// Try getting an exception.
previousTask.get();
String^ message = "Listening on address " +
localHost->CanonicalName;
}
catch (Platform::Exception^ exception) {
String^ message = "Start listening failed with error: " +
exception->Message;
}
});
listener->ConnectionReceived += ref new
TypedEventHandler<StreamSocketListener^,
StreamSocketListenerConnectionReceivedEventArgs^>(this,
&NetworkHandler::doAccept);
}
void NetworkHandler::doAccept(StreamSocketListener^ listener,
StreamSocketListenerConnectionReceivedEventArgs^ object) {
socket = object->Socket;
reader = ref new DataReader(socket->InputStream);
reader->InputStreamOptions = InputStreamOptions::Partial;
writer = ref new DataWriter(socket->OutputStream);
ifConnected = true;
doRead();
doWrite();
}
void NetworkHandler::doRead() {
task<UINT32>(reader->LoadAsync(sizeof(UINT32))).then([this](UINT32
size) {
if (size < sizeof(UINT32)) {
// The underlying socket was closed before we were able to read
// the whole data.
cancel_current_task();
}
UINT32 dataLength = reader->ReadUInt32();
return task<UINT32>(reader->LoadAsync(dataLength)).then([this,
dataLength](UINT32 actualDataLength) {
if (actualDataLength != dataLength) {
// The underlying socket was closed before we were able to
// read the whole data.
cancel_current_task();
}
});
}).then([this](task<void> t) {
try {
// Try getting all exceptions from the continuation chain above
// this point.
t.get();
//read data here
timer = reader->ReadDouble(); //timer
altitudeASL = reader->ReadDouble(); //altitudeASL
roll = reader->ReadDouble(); //roll
pitch = reader->ReadDouble(); //pitch
yaw = reader->ReadDouble(); //yaw
vCas = reader->ReadDouble(); //vcas
Eng0Rpm = reader->ReadDouble();
Eng1Rpm = reader->ReadDouble();
Eng2Rpm = reader->ReadDouble();
Eng3Rpm = reader->ReadDouble();
doRead(); //call doRead() again after previous call
// successfully ended.
}
catch (Platform::Exception^ e) {
// Explicitly close the socket.
SocketErrorStatus errorStatus =
SocketError::GetStatus(e->HResult);
if (errorStatus != SocketErrorStatus::Unknown) {
switch (errorStatus) {
case SocketErrorStatus::HostNotFound: {
// If hostname from user, this may indicate bad input
// set a flag to ask user to re-enter hostname
break;
}
case SocketErrorStatus::ConnectionRefused: {
// The server might be temporarily busy
break;
}
case SocketErrorStatus::NetworkIsUnreachable: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::UnreachableHost: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::NetworkIsDown: {
// Could be a connectivity issue
break;
}
default: {
// Connection failed and no options are available
// Try to use cached data if available
// may want to tell user that connect failed
break;
}
}
}
else {
// got an Hresult that is not mapped to an enum
// Could be a connectivity issue
}
ifConnected = false;
delete socket;
}
catch (task_canceled&) {
// Do not print anything here - this will usually happen
// because user closed the client socket.
// Explicitly close the socket.
ifConnected = false;
delete socket;
}
});
}
void NetworkHandler::doWrite() {
writer->WriteUInt32(4 * sizeof(DOUBLE)); //data size in bytes;
writer->WriteDouble(aileronCmd); // 1-st double, aileron;
writer->WriteDouble(elevatorCmd); //2-nd double, elevator;
writer->WriteDouble(rudderCmd); //3-rd double, rudder;
writer->WriteDouble(throttleCmd); //4-th double, throttle;
UINT32 totalMessageSize = sizeof(UINT32) + 4 * sizeof(DOUBLE);
//total message size
task<UINT32>(writer->StoreAsync()).then([this, totalMessageSize]
(UINT32 writtenBytes) {
if (writtenBytes != totalMessageSize)
cancel_current_task();
}).then([this](task<void> t) {
try {
// Try getting all exceptions from the continuation chain above
this point.
t.get();
doWrite(); //call doWrite() again after previous call
successfully ended.
}
catch (Platform::Exception^ e) {
// Explicitly close the socket.
SocketErrorStatus errorStatus =
SocketError::GetStatus(e->HResult);
if (errorStatus != SocketErrorStatus::Unknown) {
switch (errorStatus) {
case SocketErrorStatus::HostNotFound: {
// If hostname from user, this may indicate bad input
// set a flag to ask user to re-enter hostname
break;
}
case SocketErrorStatus::ConnectionRefused: {
// The server might be temporarily busy
break;
}
case SocketErrorStatus::NetworkIsUnreachable: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::UnreachableHost: {
// Could be a connectivity issue
break;
}
case SocketErrorStatus::NetworkIsDown: {
// Could be a connectivity issue
break;
}
default: {
// Connection failed and no options are available
// Try to use cached data if available
// may want to tell user that connect failed
break;
}
}
}
else {
// got an Hresult that is not mapped to an enum
// Could be a connectivity issue
}
ifConnected = false;
delete socket;
}
catch (task_canceled&) {
// Do not print anything here - this will usually happen
// because user closed the client socket.
// Explicitly close the socket.
ifConnected = false;
delete socket;
}
});
}
客户端:
void SocketBoard::Connect() {
socket = ref new StreamSocket();
socket->Control->KeepAlive = true;
socket->Control->NoDelay = true;
socket->Control->QualityOfService =
Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
reader = ref new DataReader(socket->InputStream);
reader->InputStreamOptions = InputStreamOptions::Partial;
writer = ref new DataWriter(socket->OutputStream);
String^ remoteHostAddr = ref new String(L"192.168.0.10");
HostName^ remoteHost = ref new HostName(remoteHostAddr);
String^ remotePort = ref new String(L"3001");
create_task(socket->ConnectAsync(remoteHost, remotePort)).get();
ifConnected = true;
TimeSpan period;
period.Duration = 1 * 10000000; // 10,000,000 ticks per second
ThreadPoolTimer^ PeriodicTimer =
ThreadPoolTimer::CreatePeriodicTimer(ref new
TimerElapsedHandler([this](ThreadPoolTimer^ source) {
timer_sec--;
if (timer_sec <= 0)
source->Cancel();
}), period, ref new TimerDestroyedHandler([&](ThreadPoolTimer^ source)
{}));
doRead();
doWrite();
}
SocketBoard::doRead() { //same as Server }
SocketBoard::doWrite() { //same as Server }
我觉得有些东西(内部缓冲区,或者线程池)是有开销的,这导致了异常。
谁能解释一下?
我删除了
reader->InputStreamOptions = InputStreamOptions::Partial;
现在 reading/writing 操作正常。复制的时候出错了
if (actualDataLength != dataLength) {
// The underlying socket was closed before we were able to
// read the whole data.
cancel_current_task();
}
来自 MS Samples(在上面的问题中提到)。因此,当 read 是 PARTIAL 时,调用 cancel_current_task() 并因此关闭套接字,这实际上迫使相反的点 (client/server) 抛出。