线程池卡在等待状态
Thread pool stuck on wait condition
我在使用此线程池的 C++ 程序中遇到卡住问题 class:
class ThreadPool {
unsigned threadCount;
std::vector<std::thread> threads;
std::list<std::function<void(void)> > queue;
std::atomic_int jobs_left;
std::atomic_bool bailout;
std::atomic_bool finished;
std::condition_variable job_available_var;
std::condition_variable wait_var;
std::mutex wait_mutex;
std::mutex queue_mutex;
std::mutex mtx;
void Task() {
while (!bailout) {
next_job()();
--jobs_left;
wait_var.notify_one();
}
}
std::function<void(void)> next_job() {
std::function<void(void)> res;
std::unique_lock<std::mutex> job_lock(queue_mutex);
// Wait for a job if we don't have any.
job_available_var.wait(job_lock, [this]()->bool { return queue.size() || bailout; });
// Get job from the queue
mtx.lock();
if (!bailout) {
res = queue.front();
queue.pop_front();
}else {
// If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
res = [] {};
++jobs_left;
}
mtx.unlock();
return res;
}
public:
ThreadPool(int c)
: threadCount(c)
, threads(threadCount)
, jobs_left(0)
, bailout(false)
, finished(false)
{
for (unsigned i = 0; i < threadCount; ++i)
threads[i] = std::move(std::thread([this, i] { this->Task(); }));
}
~ThreadPool() {
JoinAll();
}
void AddJob(std::function<void(void)> job) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.emplace_back(job);
++jobs_left;
job_available_var.notify_one();
}
void JoinAll(bool WaitForAll = true) {
if (!finished) {
if (WaitForAll) {
WaitAll();
}
// note that we're done, and wake up any thread that's
// waiting for a new job
bailout = true;
job_available_var.notify_all();
for (auto& x : threads)
if (x.joinable())
x.join();
finished = true;
}
}
void WaitAll() {
std::unique_lock<std::mutex> lk(wait_mutex);
if (jobs_left > 0) {
wait_var.wait(lk, [this] { return this->jobs_left == 0; });
}
lk.unlock();
}
};
gdb 说(当停止被阻止的执行时)卡住的是 (std::unique_lock&, ThreadPool::WaitAll()::{lambda()#1})+58>
我正在使用支持 c++14 (-std=c++1y) 的 g++ v5.3.0
如何避免这个问题?
更新
我编辑(重写)了 class:https://github.com/edoz90/threadpool/blob/master/ThreadPool.h
我认为您需要保持简单。
您似乎过多地使用了互斥锁。所以有 queue_mutex
并且您在添加和处理作业时使用它。
现在,当您等待读取队列时,为什么需要另一个单独的互斥体?
为什么不能只使用具有相同 queue_mutex
的条件变量来读取 WaitAll()
方法中的队列?
更新
我还建议在您的 WaitAll
中使用 lock_guard
而不是 unique_lock
。在特殊情况下,确实没有必要将 queue_mutex
锁定在 WaitAll
之外。如果你异常退出 WaitAll
它应该被释放。
更新2
忽略我上面的更新。由于您使用的是条件变量,因此您不能在 WaitAll
中使用锁守卫。但是,如果您使用的是 unique_lock
,请始终使用 try_to_lock
版本,尤其是当您有多个控制路径时
这里的问题是您的作业数存在竞争条件。您使用一个互斥量来保护队列,使用另一个互斥量来保护计数,这在语义上等同于队列大小。显然,第二个互斥体是多余的(并且使用不当),job_count
变量本身也是如此。
每个处理队列的方法都必须获得对它的独占访问权(甚至 JoinAll
来读取它的大小),所以你应该在三位代码中使用相同的 queue_mutex
篡改它(JoinAll
、AddJob
和 next_job
)。
顺便说一句,在 next_job()
处拆分代码在我看来非常尴尬。如果您在单个函数中处理辅助线程主体,则可以避免调用虚拟函数。
编辑:
正如其他评论已经指出的那样,您最好暂时将目光从代码上移开,重新考虑全局问题。
这里你唯一需要保护的是作业队列,所以你只需要一个互斥量。
然后是唤醒各个actor的问题,这需要一个条件变量,因为C++基本上不会给你任何其他可用的同步对象。
同样,您不需要一个以上的变量。终止线程池相当于将作业出队而不执行它们,这可以通过任何方式完成,无论是在工作线程本身(如果设置了终止标志则跳过执行)还是在 JoinAll
函数(清除获得独占访问权后的队列)。
最后但并非最不重要的一点是,一旦有人决定关闭池,您可能希望使 AddJob
无效,否则您可能会在有人继续提供新工作时陷入析构函数。
我在使用此线程池的 C++ 程序中遇到卡住问题 class:
class ThreadPool {
unsigned threadCount;
std::vector<std::thread> threads;
std::list<std::function<void(void)> > queue;
std::atomic_int jobs_left;
std::atomic_bool bailout;
std::atomic_bool finished;
std::condition_variable job_available_var;
std::condition_variable wait_var;
std::mutex wait_mutex;
std::mutex queue_mutex;
std::mutex mtx;
void Task() {
while (!bailout) {
next_job()();
--jobs_left;
wait_var.notify_one();
}
}
std::function<void(void)> next_job() {
std::function<void(void)> res;
std::unique_lock<std::mutex> job_lock(queue_mutex);
// Wait for a job if we don't have any.
job_available_var.wait(job_lock, [this]()->bool { return queue.size() || bailout; });
// Get job from the queue
mtx.lock();
if (!bailout) {
res = queue.front();
queue.pop_front();
}else {
// If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
res = [] {};
++jobs_left;
}
mtx.unlock();
return res;
}
public:
ThreadPool(int c)
: threadCount(c)
, threads(threadCount)
, jobs_left(0)
, bailout(false)
, finished(false)
{
for (unsigned i = 0; i < threadCount; ++i)
threads[i] = std::move(std::thread([this, i] { this->Task(); }));
}
~ThreadPool() {
JoinAll();
}
void AddJob(std::function<void(void)> job) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.emplace_back(job);
++jobs_left;
job_available_var.notify_one();
}
void JoinAll(bool WaitForAll = true) {
if (!finished) {
if (WaitForAll) {
WaitAll();
}
// note that we're done, and wake up any thread that's
// waiting for a new job
bailout = true;
job_available_var.notify_all();
for (auto& x : threads)
if (x.joinable())
x.join();
finished = true;
}
}
void WaitAll() {
std::unique_lock<std::mutex> lk(wait_mutex);
if (jobs_left > 0) {
wait_var.wait(lk, [this] { return this->jobs_left == 0; });
}
lk.unlock();
}
};
gdb 说(当停止被阻止的执行时)卡住的是 (std::unique_lock&, ThreadPool::WaitAll()::{lambda()#1})+58>
我正在使用支持 c++14 (-std=c++1y) 的 g++ v5.3.0
如何避免这个问题?
更新
我编辑(重写)了 class:https://github.com/edoz90/threadpool/blob/master/ThreadPool.h
我认为您需要保持简单。
您似乎过多地使用了互斥锁。所以有 queue_mutex
并且您在添加和处理作业时使用它。
现在,当您等待读取队列时,为什么需要另一个单独的互斥体?
为什么不能只使用具有相同 queue_mutex
的条件变量来读取 WaitAll()
方法中的队列?
更新
我还建议在您的 WaitAll
中使用 lock_guard
而不是 unique_lock
。在特殊情况下,确实没有必要将 queue_mutex
锁定在 WaitAll
之外。如果你异常退出 WaitAll
它应该被释放。
更新2
忽略我上面的更新。由于您使用的是条件变量,因此您不能在 WaitAll
中使用锁守卫。但是,如果您使用的是 unique_lock
,请始终使用 try_to_lock
版本,尤其是当您有多个控制路径时
这里的问题是您的作业数存在竞争条件。您使用一个互斥量来保护队列,使用另一个互斥量来保护计数,这在语义上等同于队列大小。显然,第二个互斥体是多余的(并且使用不当),job_count
变量本身也是如此。
每个处理队列的方法都必须获得对它的独占访问权(甚至 JoinAll
来读取它的大小),所以你应该在三位代码中使用相同的 queue_mutex
篡改它(JoinAll
、AddJob
和 next_job
)。
顺便说一句,在 next_job()
处拆分代码在我看来非常尴尬。如果您在单个函数中处理辅助线程主体,则可以避免调用虚拟函数。
编辑:
正如其他评论已经指出的那样,您最好暂时将目光从代码上移开,重新考虑全局问题。
这里你唯一需要保护的是作业队列,所以你只需要一个互斥量。
然后是唤醒各个actor的问题,这需要一个条件变量,因为C++基本上不会给你任何其他可用的同步对象。
同样,您不需要一个以上的变量。终止线程池相当于将作业出队而不执行它们,这可以通过任何方式完成,无论是在工作线程本身(如果设置了终止标志则跳过执行)还是在 JoinAll
函数(清除获得独占访问权后的队列)。
最后但并非最不重要的一点是,一旦有人决定关闭池,您可能希望使 AddJob
无效,否则您可能会在有人继续提供新工作时陷入析构函数。