jpeg_write_scanlines 和 glTexImage2D 线程安全。为什么这不会崩溃?
jpeg_write_scanlines and glTexImage2D thread safety. Why doesn't this crash?
我正在开发视频软件并使用一些现有代码。现有代码包括一个循环缓冲区。作为制作人,我有一台摄像机,作为消费者,我有两个不同的线程。一个是 GLThread,使用 OpenGL 绘制帧,另一个是 VideoCompressorThread,将帧压缩为 jpeg 格式以将其保存到视频文件中。奇怪的是,目前这两个线程同时处理相同的数据,但这不会产生竞争条件。在 GLThread 中我有:
while(!shouldStop) {
mutex_.lock();
glw_->makeCurrent();
shaderProgram_.bind();
shaderProgram_.setUniformValue("texture", 0);
shaderProgram_.setAttributeArray("vertex", vertices_.constData());
shaderProgram_.enableAttributeArray("vertex");
shaderProgram_.setAttributeArray("textureCoordinate", textureCoordinates_.constData());
shaderProgram_.enableAttributeArray("textureCoordinate");
qDebug() << "GLThread: " << "data address: " << static_cast<void*>(imBuf_) << "time: " << QDateTime::currentMSecsSinceEpoch();
glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB8, VIDEO_WIDTH, VIDEO_HEIGHT, 0, GL_RGB, GL_UNSIGNED_BYTE, (GLubyte*)imBuf_);
qDebug() << "GLThread finished";
glClear(GL_COLOR_BUFFER_BIT);
glDrawArrays(GL_TRIANGLES, 0, 6);
glw_->swapBuffers();
shaderProgram_.disableAttributeArray("vertex");
shaderProgram_.disableAttributeArray("textureCoordinate");
shaderProgram_.release();
glw_->doneCurrent();
mutex_.unlock();
}
并且在 VideoCompressorThread 中:
while(!shouldStop)
{
// JPEG-related stuff
struct jpeg_compress_struct cinfo;
struct jpeg_error_mgr jerr;
JSAMPROW row_pointer;
unsigned char* jpgBuf=NULL;
unsigned long jpgBufLen=0;
unsigned char* data;
ChunkAttrib chunkAttrib;
// Get raw image from the input buffer
data = inpBuf->getChunk(&chunkAttrib);
// Initialize JPEG
cinfo.err = jpeg_std_error(&jerr);
jpeg_create_compress(&cinfo);
jpeg_mem_dest(&cinfo, &jpgBuf, &jpgBufLen);
// Set the parameters of the output file
cinfo.image_width = VIDEO_WIDTH;
cinfo.image_height = VIDEO_HEIGHT;
cinfo.input_components = 3;
cinfo.in_color_space = JCS_RGB;
// Use default compression parameters
jpeg_set_defaults(&cinfo);
jpeg_set_quality(&cinfo, jpgQuality, TRUE);
// Do the compression
jpeg_start_compress(&cinfo, TRUE);
// write one row at a time
qDebug() << "VideoCompressorThread: " << "data address: " << static_cast<void*>(data) << "time: " << QDateTime::currentMSecsSinceEpoch();
while(cinfo.next_scanline < cinfo.image_height)
{
row_pointer = (data + (cinfo.next_scanline * cinfo.image_width * 3));
jpeg_write_scanlines(&cinfo, &row_pointer, 1);
}
qDebug() << "VideoCompressorThread finished";
// clean up after we're done compressing
jpeg_finish_compress(&cinfo);
// Insert compressed image into the output buffer
chunkAttrib.chunkSize = jpgBufLen;
outBuf->insertChunk(jpgBuf, chunkAttrib);
// The output buffer needs to be explicitly freed by the libjpeg client
free(jpgBuf);
jpeg_destroy_compress(&cinfo);
}
作为输出我得到:
VideoCompressorThread: data address: 0x7fffbdcd1060 time: 1438594694479
VideoCompressorThread finished
GLThread: data address: 0x7fffbdcd1060 time: 1438594694488
GLThread finished
GLThread: data address: 0x7fffbddb20b0 time: 1438594694497
GLThread finished
VideoCompressorThread: data address: 0x7fffbddb20b0 time: 1438594694498
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbde93100 time: 1438594694521
GLThread: data address: 0x7fffbde93100 time: 1438594694521
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbdf74150 time: 1438594694538
GLThread: data address: 0x7fffbdf74150 time: 1438594694538
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe0551a0 time: 1438594694555
GLThread: data address: 0x7fffbe0551a0 time: 1438594694555
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe1361f0 time: 1438594694571
GLThread: data address: 0x7fffbe1361f0 time: 1438594694571
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe217240 time: 1438594694588
GLThread: data address: 0x7fffbe217240 time: 1438594694588
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe2f8290 time: 1438594694604
GLThread: data address: 0x7fffbe2f8290 time: 1438594694604
GLThread finished
VideoCompressorThread finished
如您所见,有时两个线程同时访问相同的数据但没有崩溃。这是纯粹的运气,还是我在这里不明白?如果有任何不同,我正在使用 Xubuntu 14.04。
编辑。 insertChunk 和 getChunk() 函数。请注意,只有 VideoCompressorThread 使用 getChunk() 获取数据指针。 GLThread 连接到 chunkReady qt 信号。这使得缓冲区可以使用一个主要消费者和多个次要消费者。
void CycDataBuffer::insertChunk(unsigned char* _data, ChunkAttrib &_attrib)
{
// Check for buffer overflow. CIRC_BUF_MARG is the safety margin against
// race condition between consumer and producer threads when the buffer
// is close to full.
if (buffSemaphore->available() >= bufSize * (1-CIRC_BUF_MARG))
{
cerr << "Circular buffer overflow!" << endl;
abort();
}
// Make sure that the safety margin is at least several (four) times the
// chunk size. This is necessary to prevent the race condition between
// consumer and producer threads when the buffer is close to full.
if(_attrib.chunkSize+sizeof(ChunkAttrib)+MAXLOG > bufSize*MAX_CHUNK_SIZE)
{
cerr << "The chunk size is too large!" << endl;
abort();
}
// insert the data into the circular buffer
_attrib.isRec = isRec;
memcpy(dataBuf + insertPtr, (unsigned char*)(&_attrib), sizeof(ChunkAttrib));
insertPtr += sizeof(ChunkAttrib);
buffSemaphore->release(sizeof(ChunkAttrib));
memcpy(dataBuf + insertPtr, _data, _attrib.chunkSize);
buffSemaphore->release(_attrib.chunkSize);
emit chunkReady(dataBuf + insertPtr);
insertPtr += _attrib.chunkSize;
if(insertPtr >= bufSize)
{
insertPtr = 0;
}
}
unsigned char* CycDataBuffer::getChunk(ChunkAttrib* _attrib)
{
unsigned char* res;
buffSemaphore->acquire(sizeof(ChunkAttrib));
memcpy((unsigned char*)_attrib, dataBuf + getPtr, sizeof(ChunkAttrib));
getPtr += sizeof(ChunkAttrib);
buffSemaphore->acquire(_attrib->chunkSize);
res = dataBuf + getPtr;
getPtr += _attrib->chunkSize;
if(getPtr >= bufSize)
{
getPtr = 0;
}
return(res);
}
仅仅因为它没有崩溃并不意味着它不是错误。在另一个线程正在读取时写入缓冲区通常会导致 读取线程读取的数据损坏。一些字节读取为新值,一些字节读取为旧值。
您会看到发生的事情之一是图像缓冲区的某些部分被覆盖,而另一个线程正在处理它,这会导致在观看视频时 screen-tearing。您可以通过在屏幕上快速移动的对角线条纹看到最佳效果。
2 个线程读取同一个缓冲区完全没问题,当一个线程开始写入时问题就开始了。
除了下面棘轮怪人的精彩回答 "Just because it doesn't crash doesn't mean it's not a bug" 之外,我想补充一点,我实际上看不出这两个特定代码片段无法并行工作的原因。两者都以只读方式访问相同的图像数据,这非常好。
只有当至少有两个线程(或进程,在共享内存的情况下)访问同一个缓冲区并且至少其中一个正在修改它,即通过覆盖数据,或通过取消分配缓冲区。
我正在开发视频软件并使用一些现有代码。现有代码包括一个循环缓冲区。作为制作人,我有一台摄像机,作为消费者,我有两个不同的线程。一个是 GLThread,使用 OpenGL 绘制帧,另一个是 VideoCompressorThread,将帧压缩为 jpeg 格式以将其保存到视频文件中。奇怪的是,目前这两个线程同时处理相同的数据,但这不会产生竞争条件。在 GLThread 中我有:
while(!shouldStop) {
mutex_.lock();
glw_->makeCurrent();
shaderProgram_.bind();
shaderProgram_.setUniformValue("texture", 0);
shaderProgram_.setAttributeArray("vertex", vertices_.constData());
shaderProgram_.enableAttributeArray("vertex");
shaderProgram_.setAttributeArray("textureCoordinate", textureCoordinates_.constData());
shaderProgram_.enableAttributeArray("textureCoordinate");
qDebug() << "GLThread: " << "data address: " << static_cast<void*>(imBuf_) << "time: " << QDateTime::currentMSecsSinceEpoch();
glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB8, VIDEO_WIDTH, VIDEO_HEIGHT, 0, GL_RGB, GL_UNSIGNED_BYTE, (GLubyte*)imBuf_);
qDebug() << "GLThread finished";
glClear(GL_COLOR_BUFFER_BIT);
glDrawArrays(GL_TRIANGLES, 0, 6);
glw_->swapBuffers();
shaderProgram_.disableAttributeArray("vertex");
shaderProgram_.disableAttributeArray("textureCoordinate");
shaderProgram_.release();
glw_->doneCurrent();
mutex_.unlock();
}
并且在 VideoCompressorThread 中:
while(!shouldStop)
{
// JPEG-related stuff
struct jpeg_compress_struct cinfo;
struct jpeg_error_mgr jerr;
JSAMPROW row_pointer;
unsigned char* jpgBuf=NULL;
unsigned long jpgBufLen=0;
unsigned char* data;
ChunkAttrib chunkAttrib;
// Get raw image from the input buffer
data = inpBuf->getChunk(&chunkAttrib);
// Initialize JPEG
cinfo.err = jpeg_std_error(&jerr);
jpeg_create_compress(&cinfo);
jpeg_mem_dest(&cinfo, &jpgBuf, &jpgBufLen);
// Set the parameters of the output file
cinfo.image_width = VIDEO_WIDTH;
cinfo.image_height = VIDEO_HEIGHT;
cinfo.input_components = 3;
cinfo.in_color_space = JCS_RGB;
// Use default compression parameters
jpeg_set_defaults(&cinfo);
jpeg_set_quality(&cinfo, jpgQuality, TRUE);
// Do the compression
jpeg_start_compress(&cinfo, TRUE);
// write one row at a time
qDebug() << "VideoCompressorThread: " << "data address: " << static_cast<void*>(data) << "time: " << QDateTime::currentMSecsSinceEpoch();
while(cinfo.next_scanline < cinfo.image_height)
{
row_pointer = (data + (cinfo.next_scanline * cinfo.image_width * 3));
jpeg_write_scanlines(&cinfo, &row_pointer, 1);
}
qDebug() << "VideoCompressorThread finished";
// clean up after we're done compressing
jpeg_finish_compress(&cinfo);
// Insert compressed image into the output buffer
chunkAttrib.chunkSize = jpgBufLen;
outBuf->insertChunk(jpgBuf, chunkAttrib);
// The output buffer needs to be explicitly freed by the libjpeg client
free(jpgBuf);
jpeg_destroy_compress(&cinfo);
}
作为输出我得到:
VideoCompressorThread: data address: 0x7fffbdcd1060 time: 1438594694479
VideoCompressorThread finished
GLThread: data address: 0x7fffbdcd1060 time: 1438594694488
GLThread finished
GLThread: data address: 0x7fffbddb20b0 time: 1438594694497
GLThread finished
VideoCompressorThread: data address: 0x7fffbddb20b0 time: 1438594694498
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbde93100 time: 1438594694521
GLThread: data address: 0x7fffbde93100 time: 1438594694521
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbdf74150 time: 1438594694538
GLThread: data address: 0x7fffbdf74150 time: 1438594694538
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe0551a0 time: 1438594694555
GLThread: data address: 0x7fffbe0551a0 time: 1438594694555
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe1361f0 time: 1438594694571
GLThread: data address: 0x7fffbe1361f0 time: 1438594694571
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe217240 time: 1438594694588
GLThread: data address: 0x7fffbe217240 time: 1438594694588
GLThread finished
VideoCompressorThread finished
VideoCompressorThread: data address: 0x7fffbe2f8290 time: 1438594694604
GLThread: data address: 0x7fffbe2f8290 time: 1438594694604
GLThread finished
VideoCompressorThread finished
如您所见,有时两个线程同时访问相同的数据但没有崩溃。这是纯粹的运气,还是我在这里不明白?如果有任何不同,我正在使用 Xubuntu 14.04。
编辑。 insertChunk 和 getChunk() 函数。请注意,只有 VideoCompressorThread 使用 getChunk() 获取数据指针。 GLThread 连接到 chunkReady qt 信号。这使得缓冲区可以使用一个主要消费者和多个次要消费者。
void CycDataBuffer::insertChunk(unsigned char* _data, ChunkAttrib &_attrib)
{
// Check for buffer overflow. CIRC_BUF_MARG is the safety margin against
// race condition between consumer and producer threads when the buffer
// is close to full.
if (buffSemaphore->available() >= bufSize * (1-CIRC_BUF_MARG))
{
cerr << "Circular buffer overflow!" << endl;
abort();
}
// Make sure that the safety margin is at least several (four) times the
// chunk size. This is necessary to prevent the race condition between
// consumer and producer threads when the buffer is close to full.
if(_attrib.chunkSize+sizeof(ChunkAttrib)+MAXLOG > bufSize*MAX_CHUNK_SIZE)
{
cerr << "The chunk size is too large!" << endl;
abort();
}
// insert the data into the circular buffer
_attrib.isRec = isRec;
memcpy(dataBuf + insertPtr, (unsigned char*)(&_attrib), sizeof(ChunkAttrib));
insertPtr += sizeof(ChunkAttrib);
buffSemaphore->release(sizeof(ChunkAttrib));
memcpy(dataBuf + insertPtr, _data, _attrib.chunkSize);
buffSemaphore->release(_attrib.chunkSize);
emit chunkReady(dataBuf + insertPtr);
insertPtr += _attrib.chunkSize;
if(insertPtr >= bufSize)
{
insertPtr = 0;
}
}
unsigned char* CycDataBuffer::getChunk(ChunkAttrib* _attrib)
{
unsigned char* res;
buffSemaphore->acquire(sizeof(ChunkAttrib));
memcpy((unsigned char*)_attrib, dataBuf + getPtr, sizeof(ChunkAttrib));
getPtr += sizeof(ChunkAttrib);
buffSemaphore->acquire(_attrib->chunkSize);
res = dataBuf + getPtr;
getPtr += _attrib->chunkSize;
if(getPtr >= bufSize)
{
getPtr = 0;
}
return(res);
}
仅仅因为它没有崩溃并不意味着它不是错误。在另一个线程正在读取时写入缓冲区通常会导致 读取线程读取的数据损坏。一些字节读取为新值,一些字节读取为旧值。
您会看到发生的事情之一是图像缓冲区的某些部分被覆盖,而另一个线程正在处理它,这会导致在观看视频时 screen-tearing。您可以通过在屏幕上快速移动的对角线条纹看到最佳效果。
2 个线程读取同一个缓冲区完全没问题,当一个线程开始写入时问题就开始了。
除了下面棘轮怪人的精彩回答 "Just because it doesn't crash doesn't mean it's not a bug" 之外,我想补充一点,我实际上看不出这两个特定代码片段无法并行工作的原因。两者都以只读方式访问相同的图像数据,这非常好。
只有当至少有两个线程(或进程,在共享内存的情况下)访问同一个缓冲区并且至少其中一个正在修改它,即通过覆盖数据,或通过取消分配缓冲区。