如何修复或忽略通过套接字传输的错误解码图像

How to fix or ignore wrong decoded image streamed via socket

我使用 UDP 套接字将图像从客户端流式传输到服务器。对于编码和解码,我使用 OpenCV。有时我得到一个错误的解码图像,因为一个或一些数据包丢失(只发送 header,请查看我的终端屏幕以获取一些信息)。我必须将 jpeg 的质量降低到只有 30 以减少错误的解码图像比率。如何使用条件代码忽略丢失某些数据包的帧(不进行解码工作),或者不在 imshow 函数中显示错误的解码图像。

这是错误的解码图像:

终端跟踪屏幕:

我的客户代码:

#include "PracticalSocket.h"
#include <iostream>
#include <cstdlib>

#include "cv.hpp"
#include "config.h"

#include "logger.h" // For trace
using namespace ModernCppCI;

using namespace cv;
using namespace std;

int main(int argc, char * argv[]) {

Logger log{__func__};

if ((argc < 4) || (argc > 4)) { // Test for correct number of arguments
    log.error("Usage: {} <Server> <Server Port>\n <RTSP link>", argv[0]);
    exit(1);
}

string servAddress = argv[1]; // First arg: server address
unsigned short servPort = Socket::resolveService(argv[2], "udp");

try {
    UDPSocket sock;
    int jpegqual =  ENCODE_QUALITY; // It's 30

    Mat frame, send;
    vector < uchar > encoded;
    //VideoCapture cap("rtsp://admin:centic.vn@10.49.34.234/Streaming/Channels/1?tcp"); // Grab the camera
    VideoCapture cap(argv[3]);
    if (!cap.isOpened()) {
        log.error("OpenCV failed to open camera");
        exit(1);
    }

    clock_t last_cycle = clock();
    unsigned char pressed_key;
    while (1) {
        vector < int > compression_params;
        cap >> send;
        if(send.empty())continue;
        // JPEG encoding
        compression_params.push_back(CV_IMWRITE_JPEG_QUALITY);
        compression_params.push_back(jpegqual);
        imencode(".jpg", send, encoded, compression_params);


        imshow("send", send);

        int total_pack = 1 + (encoded.size() - 1) / PACK_SIZE; // PACK_SIZE is 4096

        int ibuf[1];
        ibuf[0] = total_pack;
        sock.sendTo(ibuf, sizeof(int), servAddress, servPort);

        for (int i = 0; i < total_pack; i++)
            sock.sendTo( & encoded[i * PACK_SIZE], PACK_SIZE, servAddress, servPort);

        pressed_key = waitKey(1);

        if(pressed_key == ' ')
            pressed_key = waitKey(0);

        if(pressed_key == 'q')
            break;
        clock_t next_cycle = clock();
        double duration = (next_cycle - last_cycle) / (double) CLOCKS_PER_SEC;
        log.info(" FPS: {}, kbps: {}, Processing time: {}ms" , (1 / duration), (PACK_SIZE * total_pack / duration / 1024 * 8), 1000*duration);

        last_cycle = next_cycle;
    }
    // Destructor closes the socket

} catch (SocketException & e) {
    log.error(e.what());
    exit(1);
}

return 0;
}

服务器代码

#include "PracticalSocket.h"
#include <iostream>
#include <cstdlib>

#include "cv.hpp"
#include "config.h"
#include "logger.h" // For trace

using namespace ModernCppCI;
using namespace cv;


int main(int argc, char * argv[]) {

Logger log{__func__};

if (argc != 2) { // Test for correct number of parameters
    log.error("Usage: {} <Server Port>", argv[0]);
    exit(1);
}

unsigned short servPort = atoi(argv[1]); // First arg:  Server port

try {
    UDPSocket sock(servPort);

    char buffer[BUF_LEN]; // Buffer for echo string
    int recvMsgSize; // Size of received message
    string sourceAddress; // Address of datagram source
    unsigned short sourcePort; // Port of datagram source

    clock_t last_cycle = clock();
    unsigned char pressed_key;
    while (1) {
        // Block until receive message from a client
        do {
            recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort); // BUF_LEN is 65540
        } while (recvMsgSize > sizeof(int));
        int total_pack = ((int * ) buffer)[0];

        log.info("expecting length of packs: {}", total_pack);
        char * longbuf = new char[PACK_SIZE * total_pack];
        for (int i = 0; i < total_pack; i++) {
            recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort);
            if (recvMsgSize != PACK_SIZE) {
                log.error("Received unexpected size pack: {}", recvMsgSize);
                continue;
            }
            memcpy( & longbuf[i * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf
        }

        log.info("Received packet from {}:{}", sourceAddress, sourcePort);
        Logger::level(LogLevel::trace);
        log.trace("longbuf size: {}", ((int * ) &longbuf)[0]);

        Mat rawData = Mat(1, PACK_SIZE * total_pack, CV_8UC1, longbuf);
        Mat frame = imdecode(rawData, CV_LOAD_IMAGE_COLOR);
        if (frame.empty()) {
            log.error("Decode failure!");
            continue;
        }
        imshow("recv", frame);
        pressed_key = waitKey(1);

        if(pressed_key == ' ')
            pressed_key = waitKey(0);

        if(pressed_key == 'q')
            break;

        free(longbuf);

        clock_t next_cycle = clock();
        double duration = (next_cycle - last_cycle) / (double) CLOCKS_PER_SEC;
        log.info(" FPS: {} , kbps: {} , Processing time: {}", (1 / duration), (PACK_SIZE * total_pack / duration / 1024 * 8), (next_cycle - last_cycle));

        last_cycle = next_cycle;
    }
} catch (SocketException & e) {
    log.error(e.what());
    exit(1);
  }

return 0;
}

如果我正确理解你的问题,你的协议是:

  1. 发送header(ibuf),其中包含N个预期的数据包。
  2. 发送N个数据包

服务器端发生的事情是:

  1. 收到header
  2. 收到N-1个数据包(丢失一个)
  3. 接收下一个header作为数据包并丢弃当前帧。
  4. 等待新帧header,因此丢失 2 帧。

这里你忽略的是header和数据包的区别。您已经使用的最简单的方法是检查数据包的大小。知道您可以确定如何处理当前数据包 - 它是新帧的开始(因此以前的数据已经消失)还是新数据。这样你就可以开始读取新帧,只有在数据包丢失时才会丢失。

这个片段展示了它的一个想法:

    int total_pack = 0;
    int counter = 0;
    char * longbuf = nullptr;
    while (1) {
        recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort); // BUF_LEN is 65540

        if (recvMsgSize == sizeof(int)) { // header
            total_pack = ((int *)buffer)[0];
            counter = 0; // reset frame counter
            log.info("expecting length of packs: {}", total_pack);
            if (longbuf) delete[] longbuf;
            longbuf = new char[PACK_SIZE * total_pack];
        }
        else if (recvMsgSize == PACK_SIZE){ // if we know size of incoming frame
            if (total_pack > 0) { // skip it if we dont know header yet
                memcpy(&longbuf[counter * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf
                counter++;
                if (counter == total_pack) {
                    total_pack = 0; // clear header
                    break; // whole frame received
                }
            }
        }
        else
            log.error("Received unexpected size pack: {}", recvMsgSize);
    }

此外,数据包应包含小 header 及其在整个缓冲区中的位置(帧号也很有用),因为 UDP 数据包不是按发送顺序接收的。 (他们可能会转移)。

你不应该混合使用 C++ 和 C 分配(新的和免费的) Can you mix free and constructor in C++?

我正在尝试理解您的问题。当您检测到一个意外大小的数据包时,您想要跳过整个帧;基本上在您检查解码失败并继续时,您会想要检查帧是否有意外大小的数据包并继续?或者之前,也跳过解码尝试....

如果这就是你想要做的,你可以这样做:

1.Add while 循环级别的标志:

while (1) {
bool goodFrame = true; // start out optimistic!

// Block until receive message from a client

2.Change检测到坏包时的标志:

        if (recvMsgSize != PACK_SIZE) {
            log.error("Received unexpected size pack: {}", recvMsgSize);
            goodFrame = false;
            continue;
        }

3.Check标志并跳过帧的解码和使用:

    log.trace("longbuf size: {}", ((int * ) &longbuf)[0]);

    if (!goodFrame) {
        // you probably do not need to log an error, as you did it above when you detected the bad packet.  
        continue;
    }

    Mat rawData = Mat(1, PACK_SIZE * total_pack, CV_8UC1, longbuf);

4.You 可能还想跳过复制此帧中的其余数据包,因为无论如何都不会使用该帧:

        if (goodFrame)
            memcpy( & longbuf[i * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf

因此,完整的 while 循环结果如下所示:

while (1) {
    bool goodFrame = true; // start out optimistic!

    // Block until receive message from a client
    do {
        recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort); // BUF_LEN is 65540
    } while (recvMsgSize > sizeof(int));
    int total_pack = ((int * ) buffer)[0];

    log.info("expecting length of packs: {}", total_pack);
    char * longbuf = new char[PACK_SIZE * total_pack];
    for (int i = 0; i < total_pack; i++) {
        recvMsgSize = sock.recvFrom(buffer, BUF_LEN, sourceAddress, sourcePort);
        if (recvMsgSize != PACK_SIZE) {
            log.error("Received unexpected size pack: {}", recvMsgSize);
            goodFrame = false;
            continue;
        }
    if (goodFrame)
            memcpy( & longbuf[i * PACK_SIZE], buffer, PACK_SIZE); // Copy PACK_SIZE bytes from buffer to longbuf
    }

    log.info("Received packet from {}:{}", sourceAddress, sourcePort);
    Logger::level(LogLevel::trace);
    log.trace("longbuf size: {}", ((int * ) &longbuf)[0]);

    if (!goodFrame) {
        // you probably do not need to log an error, as you did it above when you detected the bad packet.  
        continue;
    }

    Mat rawData = Mat(1, PACK_SIZE * total_pack, CV_8UC1, longbuf);
    Mat frame = imdecode(rawData, CV_LOAD_IMAGE_COLOR);
    if (frame.empty()) {
        log.error("Decode failure!");
        continue;
    }
    imshow("recv", frame);
    pressed_key = waitKey(1);

    if(pressed_key == ' ')
        pressed_key = waitKey(0);

    if(pressed_key == 'q')
        break;

    free(longbuf);

    clock_t next_cycle = clock();
    double duration = (next_cycle - last_cycle) / (double) CLOCKS_PER_SEC;
    log.info(" FPS: {} , kbps: {} , Processing time: {}", (1 / duration), (PACK_SIZE * total_pack / duration / 1024 * 8), (next_cycle - last_cycle));

    last_cycle = next_cycle;
}

如果我误解了你的问题,请澄清你的问题,希望我能提供更多帮助。