基于生产者-消费者的多线程图像处理
Producer-consumer based multi-threading for image processing
更新:我在下面的回答中提供了问题的原因及其解决方案。
我想为图像处理任务实现基于生产者-消费者方法的多线程。对于我的情况,Producer
线程应该抓取图像并将它们放入 container
而消费者线程应该从 Container
线程中提取图像。我认为我应该使用 queue
来实现 container
。
我想按照 中的建议使用以下代码。但是我对 container
的 实现以及在 Producer
线程中将传入图像放入其中感到非常困惑。
问题: 第一个 consumer thread
显示的图像不包含完整数据。并且,第二个 consumer thread
从不显示任何图像。可能是由于某些竞争情况或锁定情况导致第二个线程根本无法访问队列的数据。我已经尝试使用 Mutex
.
#include <vector>
#include <thread>
#include <memory>
#include <queue>
#include <opencv2/highgui.hpp>
#include <opencv2/core.hpp>
#include <opencv2/imgproc.hpp>
Mutex mu;
struct ThreadSafeContainer
{
queue<unsigned char*> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
unsigned char *pt_src = image.data;
mu.lock();
container->safeContainer.push(pt_src);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
unsigned char *ptr_consumer_Image;
ptr_consumer_Image = container->safeContainer.front(); //The front of the queue contain the pointer to the image data
container->safeContainer.pop();
Mat image(400, 400, CV_8UC3);
image.data = ptr_consumer_Image;
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
int main()
{
//Pointer object to the class containing a "container" which will help "Producer" and "Consumer" to put and take images
auto ptrObject_container = make_shared<ThreadSafeContainer>();
//Pointer object to the Producer...intialize the "container" variable of "Struct Producer" with the above created common "container"
auto ptrObject_producer = make_shared<Producer>(ptrObject_container);
//FIRST Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto first_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//SECOND Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto second_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//RUN producer thread
thread producerThread(&Producer::run, ptrObject_producer);
//RUN first thread of Consumer
thread first_consumerThread(&Consumer::run, first_ptrObject_consumer);
//RUN second thread of Consumer
thread second_consumerThread(&Consumer::run, second_ptrObject_consumer);
//JOIN all threads
producerThread.join();
first_consumerThread.join();
second_consumerThread.join();
return 0;
}
我在你的原始问题中没有看到实际问题,所以我会给你参考 material 我在大学课程中曾经实现过生产者-消费者。
http://cs360.byu.edu/static/lectures/winter-2014/semaphores.pdf
幻灯片 13 和 17 给出了生产者-消费者的好例子
我在我的 github 此处发布的实验室中使用了它:
https://github.com/qzcx/Internet_Programming/tree/master/ThreadedMessageServer
如果您查看我的 server.cc,您可以看到我对生产者-消费者模式的实现。
请记住,使用此模式时您不能切换等待语句的顺序,否则您可能会陷入死锁。
希望这对您有所帮助。
编辑:
好的,下面是我上面链接的代码中消费者-生产者模式的总结。生产者消费者背后的想法是采用线程安全的方式将任务从 "producer" 线程传递到 "consumer" 工作线程。在我的示例中,要做的工作是处理客户端请求。生产者线程 (.serve()) 监视传入的套接字并将连接传递给消费者线程 (.handle()) 以处理传入的实际请求。此模式的所有代码都可以在 server.cc 文件(在 server.h 中有一些 declarations/imports)。
为了简洁起见,我省略了一些细节。一定要遍历每一行并了解发生了什么。查找我正在使用的库函数以及参数的含义。我在这里给了你很多帮助,但你还有很多工作要做才能获得全面的理解。
制片人:
就像我上面提到的,整个生产者线程都在 .serve() 函数中。它做了以下事情
- 初始化信号量。由于 OS 差异,这里有两个版本。我在 OS X 上编程,但必须在 Linux 上打开代码。由于信号量与 OS 相关联,了解如何在您的特定设置中使用信号量很重要。
- 它设置套接字供客户端对话。对您的申请不重要。
- 创建消费者线程。
- 监视客户端套接字并使用生产者模式将项目传递给消费者。此代码如下
在 .serve() 函数的底部,您可以看到以下代码:
while ((client = accept(server_,(struct sockaddr *)&client_addr,&clientlen)) > 0) {
sem_wait(clients_.e); //buffer check
sem_wait(clients_.s);
clients_.q->push(client);
sem_post(clients_.s);
sem_post(clients_.n); //produce
}
首先,您检查缓冲区信号量 "e" 以确保您的队列中有空间来放置请求。其次,获取队列的信号量"s"。然后将您的任务(在本例中为客户端连接)添加到队列中。释放队列的信号量。最后,使用信号量 "n" 向消费者发出信号。
消费者:
在 .handle() 方法中,您实际上只关心线程的开头。
while(1){
sem_wait(clients_.n); //consume
sem_wait(clients_.s);
client = clients_.q->front();
clients_.q->pop();
sem_post(clients_.s);
sem_post(clients_.e); //buffer free
//Handles the client requests until they disconnect.
}
消费者执行与生产者类似的操作,但方式相反。首先,消费者等待生产者在信号量 "n" 上发出信号。请记住,由于有多个消费者,因此最终获得该信号量的消费者完全是随机的。他们为它而战,但每个 sem_post 信号量只能让一个人通过这一点。其次,他们像生产者一样获取队列信号量。从队列中弹出第一项并释放信号量。最后,它们在缓冲区信号量 "e" 上发出信号,表明缓冲区中现在有更多空间。
免责声明:
我知道信号量的名字很糟糕。它们与我教授的幻灯片相匹配,因为那是我学到的。我认为他们代表以下内容:
- e 表示空:如果队列已满,此信号量会阻止生产者将更多项目推送到队列中。
- s for semaphore:我最不喜欢的。但是我的教授的风格是为每个共享数据结构都有一个结构。在这种情况下 "clients_" 是包含所有三个信号量和队列的结构。基本上这个信号量是为了确保没有两个线程同时接触同一个数据结构。
- n 表示队列中的项目数。
好的,所以让它尽可能简单。您将需要 2 个线程、互斥锁、queue 和 2 个线程处理函数。
Header.h
static DWORD WINAPI ThreadFunc_Prod(LPVOID lpParam);
static DWORD WINAPI ThreadFunc_Con(LPVOID lpParam);
HANDLE m_hThread[2];
queue<int> m_Q;
mutex m_M;
添加所有需要的东西,这些只是您需要的核心部分
Source.cpp
DWORD dwThreadId;
m_hThread[0] = CreateThread(NULL, 0, this->ThreadFunc_Prod, this, 0, &dwThreadId);
// same for 2nd thread
DWORD WINAPI Server::ThreadFunc_Prod(LPVOID lpParam)
{
cYourClass* o = (cYourClass*) lpParam;
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock();
m_Q.push(nData2Q);
m_M.unlock();
}
DWORD WINAPI Server::ThreadFunc_Con(LPVOID lpParam)
{
cYourClass* o = (cYourClass*) lpParam;
int res;
m_M.lock();
if (m_Q.empty())
{
// bad, no data, escape or wait or whatever, don't block context
}
else
{
res = m_Q.front();
m_Q.pop();
}
m_M.unlock();
// do you magic with res here
}
在 main 的末尾 - 不要忘记使用 WaitForMultipleObjects
所有可能的示例都可以直接在 MSDN 中找到,因此对此有很好的评论。
第二部分:
好的,所以我相信 header 是 self-explainable,所以我会给你更多的源描述。在源代码的某个地方(甚至可以在 Constructor 中)创建线程 - 创建线程的方式可能不同但想法是相同的(在 win 中 - 线程在 posix 中创建后立即为 运行 u必须加入)。我相信你会在某个地方有一个函数来启动你所有的魔法,让我们称之为 MagicKicker()
在 posix 的情况下,在构造函数中创建线程并在 MagicKicker()
中加入 em,win - 在 MagicKicker()
中创建
你需要声明(在 header 中)你将实现线程函数的两个函数 ThreadFunc_Prod
和 ThreadFunc_Prod
,这里重要的魔法是你将传递引用你的 object 到这个函数(因为线程基本上是静态的)所以你可以轻松访问共享资源,如 queues,互斥锁等......
这些功能实际上是在做工作。实际上,您的代码中包含了您需要的所有内容,只需将其用作 Producer 中的添加例程:
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock(); // locks mutex so nobody cant enter mutex
m_Q.push(nData2Q); // puts data from producer to share queue
m_M.unlock(); // unlock mutex so u can access mutex in your consumer
并将此添加到您的消费者:
int res;
m_M.lock(); // locks mutex so u cant access anything wrapped by mutex in producer
if (m_Q.empty()) // check if there is something in queue
{
// nothing in you queue yet OR already
// skip this thread run, you can i.e. sleep for some time to build queue
Sleep(100);
continue; // in case of while wrap
return; // in case that u r running some framework with threadloop
}
else // there is actually something
{
res = m_Q.front(); // get oldest element of queue
m_Q.pop(); // delete this element from queue
}
m_M.unlock(); // unlock mutex so producer can add new items to queue
// do you magic with res here
我的问题中提到的问题是Consumer thread
显示的图像没有包含完整的数据。 Consumer thread
显示的图像包含几个补丁,表明它无法获得 Producer thread
生成的完整数据。
ANSWER 其背后的原因是 Consumer thread
的 while loop
中声明了 Mat image
。在 while loop
中创建的 Mat
实例在第二轮 while loop
开始后被删除,因此 Producer thread
永远无法访问创建的 Mat image
的数据在 Consumer thread
.
解决方案:我应该这样做
struct ThreadSafeContainer
{
queue<Mat> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
mu.lock();
container->safeContainer.push(Mat);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
Mat image= container->safeContainer.front(); //The front of the queue contain the image
container->safeContainer.pop();
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
更新:我在下面的回答中提供了问题的原因及其解决方案。
我想为图像处理任务实现基于生产者-消费者方法的多线程。对于我的情况,Producer
线程应该抓取图像并将它们放入 container
而消费者线程应该从 Container
线程中提取图像。我认为我应该使用 queue
来实现 container
。
我想按照 container
的 实现以及在 Producer
线程中将传入图像放入其中感到非常困惑。
问题: 第一个 consumer thread
显示的图像不包含完整数据。并且,第二个 consumer thread
从不显示任何图像。可能是由于某些竞争情况或锁定情况导致第二个线程根本无法访问队列的数据。我已经尝试使用 Mutex
.
#include <vector>
#include <thread>
#include <memory>
#include <queue>
#include <opencv2/highgui.hpp>
#include <opencv2/core.hpp>
#include <opencv2/imgproc.hpp>
Mutex mu;
struct ThreadSafeContainer
{
queue<unsigned char*> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
unsigned char *pt_src = image.data;
mu.lock();
container->safeContainer.push(pt_src);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
unsigned char *ptr_consumer_Image;
ptr_consumer_Image = container->safeContainer.front(); //The front of the queue contain the pointer to the image data
container->safeContainer.pop();
Mat image(400, 400, CV_8UC3);
image.data = ptr_consumer_Image;
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
int main()
{
//Pointer object to the class containing a "container" which will help "Producer" and "Consumer" to put and take images
auto ptrObject_container = make_shared<ThreadSafeContainer>();
//Pointer object to the Producer...intialize the "container" variable of "Struct Producer" with the above created common "container"
auto ptrObject_producer = make_shared<Producer>(ptrObject_container);
//FIRST Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto first_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//SECOND Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto second_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//RUN producer thread
thread producerThread(&Producer::run, ptrObject_producer);
//RUN first thread of Consumer
thread first_consumerThread(&Consumer::run, first_ptrObject_consumer);
//RUN second thread of Consumer
thread second_consumerThread(&Consumer::run, second_ptrObject_consumer);
//JOIN all threads
producerThread.join();
first_consumerThread.join();
second_consumerThread.join();
return 0;
}
我在你的原始问题中没有看到实际问题,所以我会给你参考 material 我在大学课程中曾经实现过生产者-消费者。
http://cs360.byu.edu/static/lectures/winter-2014/semaphores.pdf
幻灯片 13 和 17 给出了生产者-消费者的好例子
我在我的 github 此处发布的实验室中使用了它: https://github.com/qzcx/Internet_Programming/tree/master/ThreadedMessageServer
如果您查看我的 server.cc,您可以看到我对生产者-消费者模式的实现。
请记住,使用此模式时您不能切换等待语句的顺序,否则您可能会陷入死锁。
希望这对您有所帮助。
编辑:
好的,下面是我上面链接的代码中消费者-生产者模式的总结。生产者消费者背后的想法是采用线程安全的方式将任务从 "producer" 线程传递到 "consumer" 工作线程。在我的示例中,要做的工作是处理客户端请求。生产者线程 (.serve()) 监视传入的套接字并将连接传递给消费者线程 (.handle()) 以处理传入的实际请求。此模式的所有代码都可以在 server.cc 文件(在 server.h 中有一些 declarations/imports)。
为了简洁起见,我省略了一些细节。一定要遍历每一行并了解发生了什么。查找我正在使用的库函数以及参数的含义。我在这里给了你很多帮助,但你还有很多工作要做才能获得全面的理解。
制片人:
就像我上面提到的,整个生产者线程都在 .serve() 函数中。它做了以下事情
- 初始化信号量。由于 OS 差异,这里有两个版本。我在 OS X 上编程,但必须在 Linux 上打开代码。由于信号量与 OS 相关联,了解如何在您的特定设置中使用信号量很重要。
- 它设置套接字供客户端对话。对您的申请不重要。
- 创建消费者线程。
- 监视客户端套接字并使用生产者模式将项目传递给消费者。此代码如下
在 .serve() 函数的底部,您可以看到以下代码:
while ((client = accept(server_,(struct sockaddr *)&client_addr,&clientlen)) > 0) {
sem_wait(clients_.e); //buffer check
sem_wait(clients_.s);
clients_.q->push(client);
sem_post(clients_.s);
sem_post(clients_.n); //produce
}
首先,您检查缓冲区信号量 "e" 以确保您的队列中有空间来放置请求。其次,获取队列的信号量"s"。然后将您的任务(在本例中为客户端连接)添加到队列中。释放队列的信号量。最后,使用信号量 "n" 向消费者发出信号。
消费者:
在 .handle() 方法中,您实际上只关心线程的开头。
while(1){
sem_wait(clients_.n); //consume
sem_wait(clients_.s);
client = clients_.q->front();
clients_.q->pop();
sem_post(clients_.s);
sem_post(clients_.e); //buffer free
//Handles the client requests until they disconnect.
}
消费者执行与生产者类似的操作,但方式相反。首先,消费者等待生产者在信号量 "n" 上发出信号。请记住,由于有多个消费者,因此最终获得该信号量的消费者完全是随机的。他们为它而战,但每个 sem_post 信号量只能让一个人通过这一点。其次,他们像生产者一样获取队列信号量。从队列中弹出第一项并释放信号量。最后,它们在缓冲区信号量 "e" 上发出信号,表明缓冲区中现在有更多空间。
免责声明:
我知道信号量的名字很糟糕。它们与我教授的幻灯片相匹配,因为那是我学到的。我认为他们代表以下内容:
- e 表示空:如果队列已满,此信号量会阻止生产者将更多项目推送到队列中。
- s for semaphore:我最不喜欢的。但是我的教授的风格是为每个共享数据结构都有一个结构。在这种情况下 "clients_" 是包含所有三个信号量和队列的结构。基本上这个信号量是为了确保没有两个线程同时接触同一个数据结构。
- n 表示队列中的项目数。
好的,所以让它尽可能简单。您将需要 2 个线程、互斥锁、queue 和 2 个线程处理函数。
Header.h
static DWORD WINAPI ThreadFunc_Prod(LPVOID lpParam);
static DWORD WINAPI ThreadFunc_Con(LPVOID lpParam);
HANDLE m_hThread[2];
queue<int> m_Q;
mutex m_M;
添加所有需要的东西,这些只是您需要的核心部分
Source.cpp
DWORD dwThreadId;
m_hThread[0] = CreateThread(NULL, 0, this->ThreadFunc_Prod, this, 0, &dwThreadId);
// same for 2nd thread
DWORD WINAPI Server::ThreadFunc_Prod(LPVOID lpParam)
{
cYourClass* o = (cYourClass*) lpParam;
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock();
m_Q.push(nData2Q);
m_M.unlock();
}
DWORD WINAPI Server::ThreadFunc_Con(LPVOID lpParam)
{
cYourClass* o = (cYourClass*) lpParam;
int res;
m_M.lock();
if (m_Q.empty())
{
// bad, no data, escape or wait or whatever, don't block context
}
else
{
res = m_Q.front();
m_Q.pop();
}
m_M.unlock();
// do you magic with res here
}
在 main 的末尾 - 不要忘记使用 WaitForMultipleObjects
所有可能的示例都可以直接在 MSDN 中找到,因此对此有很好的评论。
第二部分:
好的,所以我相信 header 是 self-explainable,所以我会给你更多的源描述。在源代码的某个地方(甚至可以在 Constructor 中)创建线程 - 创建线程的方式可能不同但想法是相同的(在 win 中 - 线程在 posix 中创建后立即为 运行 u必须加入)。我相信你会在某个地方有一个函数来启动你所有的魔法,让我们称之为 MagicKicker()
在 posix 的情况下,在构造函数中创建线程并在 MagicKicker()
中加入 em,win - 在 MagicKicker()
你需要声明(在 header 中)你将实现线程函数的两个函数 ThreadFunc_Prod
和 ThreadFunc_Prod
,这里重要的魔法是你将传递引用你的 object 到这个函数(因为线程基本上是静态的)所以你可以轻松访问共享资源,如 queues,互斥锁等......
这些功能实际上是在做工作。实际上,您的代码中包含了您需要的所有内容,只需将其用作 Producer 中的添加例程:
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock(); // locks mutex so nobody cant enter mutex
m_Q.push(nData2Q); // puts data from producer to share queue
m_M.unlock(); // unlock mutex so u can access mutex in your consumer
并将此添加到您的消费者:
int res;
m_M.lock(); // locks mutex so u cant access anything wrapped by mutex in producer
if (m_Q.empty()) // check if there is something in queue
{
// nothing in you queue yet OR already
// skip this thread run, you can i.e. sleep for some time to build queue
Sleep(100);
continue; // in case of while wrap
return; // in case that u r running some framework with threadloop
}
else // there is actually something
{
res = m_Q.front(); // get oldest element of queue
m_Q.pop(); // delete this element from queue
}
m_M.unlock(); // unlock mutex so producer can add new items to queue
// do you magic with res here
我的问题中提到的问题是Consumer thread
显示的图像没有包含完整的数据。 Consumer thread
显示的图像包含几个补丁,表明它无法获得 Producer thread
生成的完整数据。
ANSWER 其背后的原因是 Consumer thread
的 while loop
中声明了 Mat image
。在 while loop
中创建的 Mat
实例在第二轮 while loop
开始后被删除,因此 Producer thread
永远无法访问创建的 Mat image
的数据在 Consumer thread
.
解决方案:我应该这样做
struct ThreadSafeContainer
{
queue<Mat> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
mu.lock();
container->safeContainer.push(Mat);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
Mat image= container->safeContainer.front(); //The front of the queue contain the image
container->safeContainer.pop();
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};