在 Swift 4 中从 InputStream 中准确读取 n 个字节

Read exactly n bytes from InputStream in Swift 4

我有一台通过 TCP 向我发送消息的服务器,其中前 4 个字节决定消息其余部分的长度。所以我需要

1) 将 4 个字节读入 UInt32(有效)并将其存储到 bytes_expected

2) 将 bytes_expected 字节读入 message

现在我的代码如下所示:

private let inputStreamAccessQueue  = DispatchQueue(label: "SynchronizedInputStreamAccess")

func inputStreamHandler(_ event: Stream.Event) {
    switch event {
        case Stream.Event.hasBytesAvailable:
            self.handleInput()

        ...
    }
}

func handleInput() {
    // **QUESTION: Do I use this barrier wrong?**
    self.inputStreamAccessQueue.sync(flags: .barrier) {            
        guard let istr = self.inputStream else {
            log.error(self.buildLogMessage("InputStream is nil"))
            return
        }

        guard istr.hasBytesAvailable else {
            log.error(self.buildLogMessage("handleInput() called when inputstream has no bytes available"))
            return
        }

        let lengthbuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 4)
        defer { lengthbuffer.deallocate(capacity: 4) }
        let lenbytes_read = istr.read(lengthbuffer, maxLength: 4)

        guard lenbytes_read == 4 else {
            self.errorHandler(NetworkingError.InputError("Input Stream received \(lenbytes_read) (!=4) bytes"))
            return
        }

        let bytes_expected = Int(UnsafeRawPointer(lengthbuffer).load(as: UInt32.self).bigEndian)
        log.info(self.buildLogMessage("expect \(bytes_expected) bytes"))

        print("::DEBUG", call, "bytes_expected", bytes_expected)

        var message = ""
        var bytes_missing = bytes_expected
        while bytes_missing > 0 {
            //print("::DEBUG", call, "bytes_missing", bytes_missing)
            let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_missing)
            let bytes_read = istr.read(buffer, maxLength: bytes_missing)

            print("::DEBUG", call, "bytes_read", bytes_read)

            guard bytes_read > 0 else {
                print("bytes_read not > 0: \(bytes_read)")
                return
            }

            guard bytes_read <= bytes_missing else {
                print("Read more bytes than expected. missing=\(bytes_missing), read=\(bytes_read)")
                return
            }

            guard let partial_message = String(bytesNoCopy: buffer, length: bytes_read, encoding: .utf8, freeWhenDone: true) else {
                log.error("ERROR WHEN READING")
                return
            }

            message = message + partial_message
            bytes_missing -= bytes_read
        }

        self.handleMessage(message)
    }
}

我的问题是 istr.read(buffer, maxLength: bytes_missing) 有时不会一次读取所有消息,所以我循环直到我读完所有我想要的。但我仍然看到我的应用程序崩溃(很少),因为 handleInput() 被再次调用,而对该方法的另一个调用仍然 运行。在这种情况下,bytes_expected 包含随机值,应用程序因非法内存分配而崩溃。

我以为我可以通过使用屏障来避免这种情况。但是好像这样不行。。。我是不是用错了barrier?

我的建议是不要对抗网络的异步特性I/O。 每当 Stream.Event.hasBytesAvailable 事件发生时,在缓冲区中读取和收集数据 发出信号。如果缓冲区包含足够的数据(4 个长度字节加上 预期的消息长度)然后处理数据并将其删除。否则什么也不做 并等待更多数据。

以下(未经测试)代码仅供演示。 它仅显示与该特定问题相关的部分。 为简洁起见,省略了初始化、事件处理程序等。

class MessageReader {

    var buffer = Data(count: 1024) // Must be large enough for largest message + 4
    var bytesRead = 0 // Number of bytes read so far

    // Called from `handleInput` with a complete message.
    func processMessage(message: Data) {
        // ...
    }

    // Called from event handler if `Stream.Event.hasBytesAvailable` is signalled.
    func handleInput(istr: InputStream) {
        assert(bytesRead < buffer.count)

        // Read from input stream, appending to previously read data:
        let maxRead = buffer.count - bytesRead
        let amount = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
            istr.read(p + bytesRead, maxLength: maxRead)
        }
        guard amount > 0 else {
            // handle EOF or read error ...
            fatalError()
        }
        bytesRead += amount

        while bytesRead >= 4 {
            // Read message size:
            let messageSize = buffer.withUnsafeBytes { (p: UnsafePointer<UInt32>) in
                Int(UInt32(bigEndian: p.pointee))
            }
            let totalSize = 4 + messageSize
            guard totalSize <= buffer.count else {
                // Handle buffer too small for message situation ...
                fatalError()
            }

            if bytesRead < totalSize {
                break // Not enough data to read message.
            }

            // Buffer contains complete message now. Process it ...
            processMessage(message: buffer[4 ..< totalSize])

            // ... and remove it from the buffer:
            if totalSize < bytesRead {
                // Move remaining data to the front:
                buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                    _ = memmove(p, p + totalSize, bytesRead - totalSize)
                }
            }
            bytesRead -= totalSize
        }
    }
}

受到 Martin R 的启发( - 非常感谢!)我想出了这个解决方案:

var buffer = Data(count: 4096)
var offset = 0 // the index of the first byte that can be overridden
var readState = 0
var missingMsgBytes = 0
var msg = ""

func handleInput(_ istr: InputStream) {
    assert(buffer.count >= 5, "buffer must be large enough to contain length info (4 bytes) and at least one payload byte => min 5 bytes buffer required")
    assert(offset < buffer.count, "offset \(offset) is not smaller than \(buffer.count)")

    let toRead = buffer.count - offset
    let read = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in istr.read(p + offset, maxLength: toRead) }
    guard read > 0 else {
        self.errorHandler(NetworkingError.InputError("Input Stream received \(read) bytes which is smaller than 0 => Network error"))
        return
    }
    offset += read
    var msgStart = 0
    var msgEnd = 0

    if readState == 0 {
        if offset < 4 {
            return
        }
        missingMsgBytes = buffer[0...3].withUnsafeBytes { (p: UnsafePointer<UInt32>) in Int(UInt32(bigEndian: p.pointee)) }
        msgStart = 4
        msgEnd = offset
        readState = 1
    } else {
        msgStart = 0
        msgEnd = offset
    }

    var fullMessageRead = false

    if readState == 1 {
        let payloadRead = msgEnd - msgStart
        if payloadRead <= missingMsgBytes {
            assert(msgEnd > msgStart, "msgEnd (\(msgEnd) <= msgStart \(msgStart). This should not happen")
            if msgEnd > msgStart {
                msg += String(data: buffer[msgStart..<msgEnd], encoding: .utf8)!
                missingMsgBytes -= payloadRead
                offset = 0
            }
            fullMessageRead = (missingMsgBytes == 0)
        } else { // read more than was missing
            msg += String(data: buffer[msgStart..<msgStart+missingMsgBytes], encoding: .utf8)!
            fullMessageRead = true
            buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                _ = memmove(p, p + missingMsgBytes, read - missingMsgBytes) // dst, src, number
            }
            offset = read-missingMsgBytes
        }
    }

    if fullMessageRead {
        handleMessage(msg)
        readState = 0
         msg = ""
        missingMsgBytes = 0
    }
}

此解决方案能够读取任意大小的消息。缓冲区大小只决定一次可以读取多少=>缓冲区越大,应用程序越快。

我现在测试了大约一个小时的代码,它没有崩溃。旧代码在 1-2 分钟后崩溃。现在好像终于可以用了。

但是因为我想提高我的编程知识,所以我想问一下我的代码中是否有一些不必要的复杂的东西,或者是否有人看到一个可能仍然导致应用程序崩溃或读取错误数据的错误?