线程池卡在等待状态

Thread pool stuck on wait condition

我在使用此线程池的 C++ 程序中遇到卡住问题 class:

class ThreadPool {

    unsigned threadCount;
    std::vector<std::thread> threads;
    std::list<std::function<void(void)> > queue;

    std::atomic_int jobs_left;
    std::atomic_bool bailout;
    std::atomic_bool finished;
    std::condition_variable job_available_var;
    std::condition_variable wait_var;
    std::mutex wait_mutex;
    std::mutex queue_mutex;
    std::mutex mtx;

    void Task() {
        while (!bailout) {
            next_job()();
            --jobs_left;
            wait_var.notify_one();
       }
    }

    std::function<void(void)> next_job() {
        std::function<void(void)> res;
        std::unique_lock<std::mutex> job_lock(queue_mutex);

        // Wait for a job if we don't have any.
        job_available_var.wait(job_lock, [this]()->bool { return queue.size() || bailout; });

        // Get job from the queue
        mtx.lock();
        if (!bailout) {
            res = queue.front();
            queue.pop_front();
        }else {
            // If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
            res = [] {};
            ++jobs_left;
        }
        mtx.unlock();
        return res;
    }

public:
    ThreadPool(int c)
        : threadCount(c)
        , threads(threadCount)
        , jobs_left(0)
        , bailout(false)
        , finished(false)
    {
        for (unsigned i = 0; i < threadCount; ++i)
            threads[i] = std::move(std::thread([this, i] { this->Task(); }));
    }

    ~ThreadPool() {
        JoinAll();
    } 

    void AddJob(std::function<void(void)> job) {
        std::lock_guard<std::mutex> lock(queue_mutex);
        queue.emplace_back(job);
        ++jobs_left;
        job_available_var.notify_one();
    }

    void JoinAll(bool WaitForAll = true) {
        if (!finished) {
            if (WaitForAll) {
                WaitAll();
            }

            // note that we're done, and wake up any thread that's
            // waiting for a new job
            bailout = true;
            job_available_var.notify_all();

            for (auto& x : threads)
                if (x.joinable())
                    x.join();
            finished = true;
        }
    }

    void WaitAll() {
        std::unique_lock<std::mutex> lk(wait_mutex);
        if (jobs_left > 0) {
            wait_var.wait(lk, [this] { return this->jobs_left == 0; });
        }
        lk.unlock();
    }
};

gdb 说(当停止被阻止的执行时)卡住的是 (std::unique_lock&, ThreadPool::WaitAll()::{lambda()#1})+58>

我正在使用支持 c++14 (-std=c++1y) 的 g++ v5.3.0

如何避免这个问题?

更新

我编辑(重写)了 class:https://github.com/edoz90/threadpool/blob/master/ThreadPool.h

我认为您需要保持简单。 您似乎过多地使用了互斥锁。所以有 queue_mutex 并且您在添加和处理作业时使用它。

现在,当您等待读取队列时,为什么需要另一个单独的互斥体?

为什么不能只使用具有相同 queue_mutex 的条件变量来读取 WaitAll() 方法中的队列?

更新

我还建议在您的 WaitAll 中使用 lock_guard 而不是 unique_lock。在特殊情况下,确实没有必要将 queue_mutex 锁定在 WaitAll 之外。如果你异常退出 WaitAll 它应该被释放。

更新2

忽略我上面的更新。由于您使用的是条件变量,因此您不能在 WaitAll 中使用锁守卫。但是,如果您使用的是 unique_lock,请始终使用 try_to_lock 版本,尤其是当您有多个控制路径时

这里的问题是您的作业数存在竞争条件。您使用一个互斥量来保护队列,使用另一个互斥量来保护计数,这在语义上等同于队列大小。显然,第二个互斥体是多余的(并且使用不当),job_count 变量本身也是如此。

每个处理队列的方法都必须获得对它的独占访问权(甚至 JoinAll 来读取它的大小),所以你应该在三位代码中使用相同的 queue_mutex篡改它(JoinAllAddJobnext_job)。

顺便说一句,在 next_job() 处拆分代码在我看来非常尴尬。如果您在单个函数中处理辅助线程主体,则可以避免调用虚拟函数。

编辑:

正如其他评论已经指出的那样,您最好暂时将目光从代码上移开,重新考虑全局问题。

这里你唯一需要保护的是作业队列,所以你只需要一个互斥量。

然后是唤醒各个actor的问题,这需要一个条件变量,因为C++基本上不会给你任何其他可用的同步对象。

同样,您不需要一个以上的变量。终止线程池相当于将作业出队而不执行它们,这可以通过任何方式完成,无论是在工作线程本身(如果设置了终止标志则跳过执行)还是在 JoinAll 函数(清除获得独占访问权后的队列)。

最后但并非最不重要的一点是,一旦有人决定关闭池,您可能希望使 AddJob 无效,否则您可能会在有人继续提供新工作时陷入析构函数。