C++11 线程挂在锁定互斥锁上

C++11 thread hangs on locking mutex

使用 C++11 std::threadstd::mutex,我正在编写一个简单的工作线程。 但是,我在锁定 std::mutex 时遇到了一个奇怪的挂起问题,看起来两个线程(主线程和工作线程)都试图锁定互斥锁,但都被阻塞了。

这是完整的代码

#include <thread>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <list>
std::condition_variable cv;
std::mutex m;
std::thread t;
bool shouldExit = false;
std::list<int> jobs;

void thread_func()
{
  std::unique_lock<std::mutex> lock(m);
  while (!shouldExit) {
    while (jobs.empty() && !shouldExit) {
      cv.wait(lock);
    }
    // Do some stuff
    if (jobs.empty()) {
      continue;
    }
    // Get a job and do something with it
    if (!lock.owns_lock()) {
      lock.lock();  // <<<< Worker thread hang here
    }
    auto j = std::move(jobs.front());
    jobs.pop_front();
    lock.unlock();
    std::cout << "Do something with job " << j << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

int main()
{
  t = std::thread(thread_func);

  for (int i = 1; i < 100; ++i) {
    std::cout << "Push to job " << i << std::endl;
    {
    std::lock_guard<std::mutex> lock(m); // <<<< main thread hang here
    jobs.push_back(i);
    cv.notify_one();
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }

  // To wait for thread exit
  shouldExit = true;
  cv.notify_one();
  t.join();
  return 0;
}

我正在 Ubuntu 14.04

上使用以下命令编译代码
g++ -std=c++11 -g -O0 -pthread -o testthread testthread.cpp

执行结果一般是这样的:

$ ./testthread
Push to job 1
Do something with job 1
Push to job 2
Do something with job 2
Push to job 3
Push to job 4

有趣的是,当我将主线程中 sleeping-1ms 的一行代码移动到下面的 lock_guard 中时,问题就消失了。

  for (int i = 1; i < 100; ++i) {
    std::cout << "Push to job " << i << std::endl;
    {
    std::lock_guard<std::mutex> lock(m);
    jobs.push_back(i);
    cv.notify_one();
    std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Moved into lock_guard
    }
  }

我不明白为什么。 你能帮忙解释一下代码的行为以及我做错了什么吗?

[更新] 我知道以某种方式重写工作线程可以解决这个问题。但是我仍然想知道在原始代码中当两个线程锁定互斥锁但都被阻塞时到底发生了什么。

尝试像这样重写您的工作线程:

void thread_func()
{
  while (!shouldExit) 
  {
    int j ;
    {  
      std::unique_lock<std::mutex> lock(m);  // lock object inside while
      while (jobs.empty() && !shouldExit) {
        cv.wait(lock);
      }
      // Do some stuff
      if (jobs.empty()) {
        continue;
      }
      j = jobs.front(); 
      jobs.pop_front();
    } // lock goes out of scope
    std::cout << "Do something with job " << j << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

你可以看到j不再被移动,你可以将int j ;之后的部分封装在一个函数中以获得相同的效果。

重写的主要思想是避免混淆 lock 的成员函数,并通过让 constructor/destructor 进行锁定作业来使用它。

Seems to work...

你这里有问题:

   while (jobs.empty() && !shouldExit) {
      cv.wait(lock);
    }
    // Do some stuff
    if (jobs.empty()) {
      continue;
    }

当你醒来时,你拥有这把锁。但是,通过调用 continue 你失去了释放它的任何机会。

lock 未锁定的情况下调用 cv.wait 是未定义的行为。添加此断言:

while (!shouldExit) {
  assert(lock.owns_lock());    // <------ add this
  while (jobs.empty() && !shouldExit) {
    cv.wait(lock);
  }

libc++ 将从 wait if !lock.owns_lock() 抛出,但我不知道其他实现会做什么。

您的代码中存在严重且经典的错误....

首先请看annotated/numbered评论。我会参考他们

void thread_func()
{
  std::unique_lock<std::mutex> lock(m);        // <---- {1}
  while (!shouldExit) {                        // <---- {2}
    while (jobs.empty() && !shouldExit) {      // <---- {3}
      cv.wait(lock);
    }
    // Do some stuff
    if (jobs.empty()) {
      continue;
    }

    if (!lock.owns_lock()) {
      lock.lock();                             // <---- {4}
    }
    auto j = std::move(jobs.front());
    jobs.pop_front();
    lock.unlock();                             // <---- {5}
    std::cout << "Do something with job " << j << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

{1} 这很好......看到你在 {5}

中击败了这个目标

{2} shouldExit 应该是一个原子布尔值。否则你会有竞争条件

{3} 在执行的某个时刻,将在不持有锁的情况下测试此条件,请参阅 {5} 中的解锁语句。因此,您还有另一个竞争条件。

{4} 对于解锁的互斥锁,在您测试锁和发出锁之间,互斥锁可能会被获取,导致它永远在这里等待。

{5} 为下一次循环的执行解锁互斥量...将发生严重的竞争条件和死锁。

正在修补您当前的解决方案...{糟糕但有效的补丁...进一步阅读}

只需将 lock.lock() 添加到您的 thread_func()

的最后一行

像这样....

void thread_func()
{
    .....more code omitted
    ........
    lock.unlock();                            
    std::cout << "Do something with job " << j << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1));

    lock.lock();   //YOUR NEW LINE
  }
}

添加将循环恢复到互斥锁在进入前被锁定的原始状态....请注意,还有另一个代码路径可以到达循环的入口...您 continue 语句...每当std::condition_variable::wait() returns,锁总是被重新锁定,因此仍然保持不变量...

现在你的代码可以工作了!!耶!!! ……但还是很臭! std::cout 是线程安全的,但输出不同步,因此,您可能有交错的字符...

搁置 std::cout 的问题 如何正确处理?检查此代码(也请参阅评论)

void thread_func()
{
    std::unique_lock<std::mutex> lock(m);
    while (!shouldExit)    // this is redundant, so I removed it in the final code
    {
        while (jobs.empty() && !shouldExit)
        {
            cv.wait(lock, []{ return !jobs.empty(); } );
        }
        // Do some stuff
        auto j = std::move(jobs.front());
        jobs.pop_front();
        //cout is thread-safe but not synchronized
        //std::cout << "Do something with job " << j << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
}

在我所知道的大多数常见情况下,最好在 std::condition_variable::wait().

中测试您的 "ready to proceed" 条件

为您整理一下....这是一个更好的版本

#include <thread>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <list>
#include <atomic>

std::condition_variable cv;
std::mutex m;
std::mutex mxa;   //for std::cout locking
std::thread t;
std::atomic<bool> shouldExit;
std::list<int> jobs;

namespace detail
{

    std::ostream& safe_print()
    {
        return std::cout;
    }

    template<typename T, typename... Args>
    std::ostream& safe_print(T&& t, Args&&... args)
    {
        std::cout << t;
        return safe_print(std::forward<Args>(args)...);
    }
}

template<typename... Args>
std::ostream& println(Args&&... args)
{
    std::lock_guard<std::mutex> lck(mxa);
    auto&& x = detail::safe_print(std::forward<Args>(args)...);
    std::cout << std::endl;
    return x;
}

void thread_func()
{
    std::unique_lock<std::mutex> lock(m);
    while (jobs.empty() && !shouldExit)
    {
        cv.wait(lock, []{ return !jobs.empty(); } );
    }
    // Do some stuff
    auto j = std::move(jobs.front());
    jobs.pop_front();
    //std::cout << "Do something with job " << j << std::endl;
    println("Do something with job ", j);
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

int main()
{
    shouldExit = false;
    //safe_print("This is really funny ", 43, '\n');
    t = std::thread(thread_func);

    for (int i = 1; i < 100; ++i)
    {
        //std::cout << "Push to job " << i << std::endl;
        println("Push to Job ", i);
        {
            std::lock_guard<std::mutex> lock(m); // <<<< main thread doesn't hang here again
            jobs.push_back(i);
            cv.notify_one();
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    // To wait for thread exit
    shouldExit = true;
    cv.notify_one();
    t.join();
    return 0;
}