Windows 10 个插槽和 MS PPL。服务器和客户端之间的 TCP 连接在几秒后断开

Windows 10 Sockets and MS PPL. TCP connection between server and client breaks after few seconds

我的系统:

服务器: Windows 10 专业版 1511 10586.36 Microsoft Visual Studio 2015 社区 14.0.24720.00 更新 1

客户: Windows 10 个 IoT 核心(内部版本 10586)Raspberry Pi 2.

我的错误:

连接建立成功,但在正常工作几秒后就断开了。所以数据交换确实有效,但是在Release版本中只有几秒钟,而在Debug版本中大约需要40-60秒。

我总是得到:

t.get(); // See code below. Cancelled status.
SocketErrorStatus errorStatus = SocketError::GetStatus(e->HResult);//See code below. errorStatus == 0 (which means Unknown error)

我有时会在客户端出现这些错误,有时会在服务器端出现。

我阅读了 MS 示例 - https://code.msdn.microsoft.com/windowsapps/StreamSocket-Sample-8c573931。 我还发现了多种使用 SocketStream 对象实现异步 read/write 操作的方法 - https://github.com/Microsoft/Windows-universal-samples/tree/master/Samples/DataReaderWriter.

但是我无法获得稳定的工作。 下面是代码。

服务器端:

void NetworkHandler::Connect() {
    listener->Control->KeepAlive = true;
    listener->Control->NoDelay = true;
    listener->Control->QualityOfService = 
    Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
    String^ localHostAddr = ref new String(L"192.168.0.10");
    HostName^ localHost = ref new HostName(localHostAddr);
    String^ localPort = ref new String(L"3001");
    create_task(listener->BindEndpointAsync(localHost,      
    localPort)).then([this, localHost](task<void> previousTask) {
        try {
            // Try getting an exception.
            previousTask.get();
            String^ message = "Listening on address " +
            localHost->CanonicalName;
        }
        catch (Platform::Exception^ exception) {
            String^ message = "Start listening failed with error: " +
            exception->Message;
        }
    });
    listener->ConnectionReceived += ref new
    TypedEventHandler<StreamSocketListener^,
    StreamSocketListenerConnectionReceivedEventArgs^>(this,
    &NetworkHandler::doAccept);
}

void NetworkHandler::doAccept(StreamSocketListener^ listener,
   StreamSocketListenerConnectionReceivedEventArgs^ object) {
   socket = object->Socket;
   reader = ref new DataReader(socket->InputStream);
   reader->InputStreamOptions = InputStreamOptions::Partial;
   writer = ref new DataWriter(socket->OutputStream);
   ifConnected = true;
   doRead();
   doWrite();
}

void NetworkHandler::doRead() {
    task<UINT32>(reader->LoadAsync(sizeof(UINT32))).then([this](UINT32
    size) {
    if (size < sizeof(UINT32)) {
        // The underlying socket was closed before we were able to read
        // the whole data.
        cancel_current_task();
    }
    UINT32 dataLength = reader->ReadUInt32();
    return task<UINT32>(reader->LoadAsync(dataLength)).then([this, 
        dataLength](UINT32 actualDataLength) {
        if (actualDataLength != dataLength) {
            // The underlying socket was closed before we were able to 
            // read the whole data.
            cancel_current_task();
        }
    });
}).then([this](task<void> t) {
    try {
        // Try getting all exceptions from the continuation chain above 
        // this point.
        t.get();
        //read data here
        timer = reader->ReadDouble(); //timer
        altitudeASL = reader->ReadDouble(); //altitudeASL
        roll = reader->ReadDouble(); //roll
        pitch = reader->ReadDouble(); //pitch
        yaw = reader->ReadDouble(); //yaw
        vCas = reader->ReadDouble(); //vcas
        Eng0Rpm = reader->ReadDouble(); 
        Eng1Rpm = reader->ReadDouble();
        Eng2Rpm = reader->ReadDouble();
        Eng3Rpm = reader->ReadDouble();
        doRead();  //call doRead() again after previous call 
        // successfully ended.
    }
    catch (Platform::Exception^ e) {
        // Explicitly close the socket.
        SocketErrorStatus errorStatus =  
        SocketError::GetStatus(e->HResult);
        if (errorStatus != SocketErrorStatus::Unknown) {
            switch (errorStatus) {
                case SocketErrorStatus::HostNotFound: {
                    // If hostname from user, this may indicate bad input
                    // set a flag to ask user to re-enter hostname
                    break;
                }
                case SocketErrorStatus::ConnectionRefused: {
                    // The server might be temporarily busy
                    break;
                }
                case SocketErrorStatus::NetworkIsUnreachable: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::UnreachableHost: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::NetworkIsDown: {
                    // Could be a connectivity issue
                    break;
                }
                default: {
                    // Connection failed and no options are available
                    // Try to use cached data if available 
                    // may want to tell user that connect failed
                    break;
                }
            }
        }
        else {
            // got an Hresult that is not mapped to an enum
            // Could be a connectivity issue
        }
        ifConnected = false;
        delete socket;
    }
    catch (task_canceled&) {
        // Do not print anything here - this will usually happen  
        // because user closed the client socket.
        // Explicitly close the socket.
        ifConnected = false;
        delete socket;
    }
});
}

void NetworkHandler::doWrite() {
    writer->WriteUInt32(4 * sizeof(DOUBLE)); //data size in bytes;
    writer->WriteDouble(aileronCmd);  // 1-st double, aileron;
    writer->WriteDouble(elevatorCmd); //2-nd double, elevator;
    writer->WriteDouble(rudderCmd); //3-rd double, rudder;
    writer->WriteDouble(throttleCmd); //4-th double, throttle;

    UINT32 totalMessageSize = sizeof(UINT32) + 4 * sizeof(DOUBLE);
    //total message size

    task<UINT32>(writer->StoreAsync()).then([this, totalMessageSize] 
    (UINT32 writtenBytes) {
    if (writtenBytes != totalMessageSize)
        cancel_current_task();
    }).then([this](task<void> t) {
    try {
        // Try getting all exceptions from the continuation chain above 
        this point.
        t.get();
        doWrite(); //call doWrite() again after previous call
        successfully ended.
    }
    catch (Platform::Exception^ e) {
        // Explicitly close the socket.
        SocketErrorStatus errorStatus =
        SocketError::GetStatus(e->HResult);
        if (errorStatus != SocketErrorStatus::Unknown) {
            switch (errorStatus) {
                case SocketErrorStatus::HostNotFound: {
                    // If hostname from user, this may indicate bad input
                    // set a flag to ask user to re-enter hostname
                    break;
                }
                case SocketErrorStatus::ConnectionRefused: {
                    // The server might be temporarily busy
                    break;
                }
                case SocketErrorStatus::NetworkIsUnreachable: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::UnreachableHost: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::NetworkIsDown: {
                    // Could be a connectivity issue
                    break;
                }
                default: {
                    // Connection failed and no options are available
                    // Try to use cached data if available 
                    // may want to tell user that connect failed
                    break;
                }
            }
        }
        else {
            // got an Hresult that is not mapped to an enum
            // Could be a connectivity issue
        }
        ifConnected = false;
        delete socket;
    }
    catch (task_canceled&) {
            // Do not print anything here - this will usually happen
            // because user closed the client socket.
            // Explicitly close the socket.
            ifConnected = false;
            delete socket;
        }
    });
}

客户端:

void SocketBoard::Connect() {
    socket = ref new StreamSocket();
    socket->Control->KeepAlive = true;
    socket->Control->NoDelay = true;
    socket->Control->QualityOfService =  
    Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
    reader = ref new DataReader(socket->InputStream);
    reader->InputStreamOptions = InputStreamOptions::Partial;
    writer = ref new DataWriter(socket->OutputStream);
    String^ remoteHostAddr = ref new String(L"192.168.0.10");
    HostName^ remoteHost = ref new HostName(remoteHostAddr);
    String^ remotePort = ref new String(L"3001");
    create_task(socket->ConnectAsync(remoteHost, remotePort)).get();
    ifConnected = true;
    TimeSpan period;
    period.Duration = 1 * 10000000; // 10,000,000 ticks per second
    ThreadPoolTimer^ PeriodicTimer =
    ThreadPoolTimer::CreatePeriodicTimer(ref new
    TimerElapsedHandler([this](ThreadPoolTimer^ source) {
    timer_sec--;
    if (timer_sec <= 0)
        source->Cancel();
}), period, ref new TimerDestroyedHandler([&](ThreadPoolTimer^ source)  
{}));
   doRead();
   doWrite();
}

SocketBoard::doRead() { //same as Server }
SocketBoard::doWrite() { //same as Server }

我觉得有些东西(内部缓冲区,或者线程池)是有开销的,这导致了异常。

谁能解释一下?

我删除了

reader->InputStreamOptions = InputStreamOptions::Partial;

现在 reading/writing 操作正常。复制的时候出错了

if (actualDataLength != dataLength) {
    // The underlying socket was closed before we were able to 
    // read the whole data.
    cancel_current_task();
}

来自 MS Samples(在上面的问题中提到)。因此,当 read 是 PARTIAL 时,调用 cancel_current_task() 并因此关闭套接字,这实际上迫使相反的点 (client/server) 抛出。