在 Boost 中同步线程

Syncing Threads in Boost

我正在尝试创建一个创建一个主线程和 10 个从属线程的应用程序。我想在主线程 运行 之后 运行 从属线程一次。因此对于每个主线程执行,每个从线程将执行一次。我试图用两个不同的 conditional variables 来处理这个问题。因此,一个用于从线程,因此它们可以等到主线程通知它们,另一个 conditional variable 用于主线程,在每个子线程完成其任务后发出信号,因此主线程可以检查是否所有从线程完成与否。代码如下:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

std::vector<boost::thread*> threads;

std::vector<boost::mutex*> data_ready_mutex;
std::vector<boost::condition_variable*> cond;
std::vector<bool> data_ready;
std::vector<int> num_run;

boost::mutex check_finish_mutex;
std::vector<bool> finished;

boost::atomic<int> data;
boost::atomic<int> next_thread_id;

boost::mutex finished_task_mutex;
boost::condition_variable finished_task_cond;
bool finished_task = false;

void signal_finished(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(finished_task_mutex);
    finished[id] = true;
    finished_task = true;
  }
  finished_task_cond.notify_all();
}

void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);

    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

void slave_therad()
{
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    finished[id] = false;

    data_ready[id] = false;

    data++;

    num_run[id]++;

    signal_finished(id);
  }
}

void main()
{
  size_t nThreads = 10;

  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  finished.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
    finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  while (true)
  {
    clock_t start_time = clock();

    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    while (true)
    {
      boost::unique_lock<boost::mutex> lock(finished_task_mutex);
      while (!finished_task)
      {
        finished_task_cond.wait(lock);
      }
      finished_task = false;

      size_t i = 0;
      for (; i < finished.size(); i++)
      {
        if (!finished[i]) break;
      }
      if (i == finished.size()) break;
    }

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;

    for (size_t i = 0; i < threads.size(); i++)
      finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

问题是代码在某处停止并陷入死锁。

另外,我尝试改变实现方式。因此,我使用了一个 atomic<int> 来计算已完成任务的线程数,并在主线程中检查线程数是否等于已更新自身的线程数,但此方法也卡在某个地方并且陷入僵局。 代码可以在这里找到:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

std::vector<boost::thread*> threads;                //!< Slave Threads array

std::vector<boost::mutex*>  data_ready_mutex;       //!< Mutex to guard the data_ready 
std::vector<bool>           data_ready;             //!< Shows if the data is ready for the slave thread or not.
std::vector<boost::condition_variable*> cond;       //!< conditional variable to wait on data being ready for the slave thread.

std::vector<int> num_run;                           //!< Stores the number of times each slave thread is run.

boost::atomic<int> data;                            //!< Stores the data processed by each slave thread
boost::atomic<int> next_thread_id;                  //!< id for the next thread (used for giving an id from 0,..., nThreads-1

boost::atomic<int> num_threads_done;                //!< Stores the number of slave threads which has finished their task

//! Signals a slave thread to start its task
void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);
    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

//! Slave thread function
void slave_therad()
{
  // assign an id to the current slave_thread
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    // wait for a signal from the main thread
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    // make the data not ready, so the loop is not going to run without the main thread signal after the thread is done.
    data_ready[id] = false;

    // TASK for SLAVE THREAD
    data++;

    // Increase the number of times the thread is run
    num_run[id]++;

    // Increase the number of threads which has finished their tasks.
    num_threads_done++;

  }
}

void main()
{
  size_t nThreads = 10;

  // creating the data ready mutexes, conditional variables, data_ready variable (bools), num_runs array.
  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
  }

  // Creating the slave threads
  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  // Main Thread Body
  while (true)
  {
    clock_t start_time = clock();

    // Reset the number of threads which are done.
    num_threads_done = 0;

    // Signals the slave threads to start doing their task.
    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    // Wait until all the slave threads are done.
    while (true)
      if (num_threads_done == threads.size()) break;

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }

}

甚至,我尝试用障碍解决问题,但它并没有解决我的问题。代码如下:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

boost::barrier* barrier;                            //!< barrier to make sure all the slave threads are done their tasks.

std::vector<boost::thread*> threads;

std::vector<boost::mutex*>  data_ready_mutex;       //!< Mutex to guard the data_ready 
std::vector<bool>           data_ready;             //!< Shows if the data is ready for the slave thread or not.
std::vector<boost::condition_variable*> cond;       //!< conditional variable to wait on data being ready for the slave thread.

std::vector<int> num_run;                           //!< Stores the number of times each slave thread is run.

boost::atomic<int> data;                            //!< Stores the data processed by each slave thread
boost::atomic<int> next_thread_id;                  //!< id for the next thread (used for giving an id from 0,..., nThreads-1

boost::atomic<int> num_threads_done;                //!< Stores the number of slave threads which has finished their task

std::vector<bool> finished;                         //!< Array which stores if all the slave threads are done or not.
boost::mutex finished_task_mutex;                   //!< mutex to guard the finished_task variable
boost::condition_variable finished_task_cond;       //!< Conditional variable to wait for all the threads to finish they tasks.
boost::atomic<bool> finished_task(false);           //!< Variable which stores if the task of slave_threads are finished or not.

void signal_finished(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(finished_task_mutex);
    finished[id] = true;
    finished_task = true;
  }
  finished_task_cond.notify_all();
}

void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);

    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

void slave_therad()
{
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    finished[id] = false;

    data_ready[id] = false;

    data++;

    num_run[id]++;

    barrier->wait();

    signal_finished(id);
  }
}

void main()
{
  size_t nThreads = 10;

  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  finished.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
    finished[i] = false;
  }

  barrier = new boost::barrier(nThreads);

  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  while (true)
  {
    clock_t start_time = clock();

    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    while (true)
    {
      boost::unique_lock<boost::mutex> lock(finished_task_mutex);
      while (!finished_task)
      {
        finished_task_cond.wait(lock);
      }
      finished_task = false;
      break;
    }

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;

    for (size_t i = 0; i < threads.size(); i++)
      finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

[更新] 所以,我简单地在结构中使用了 mutexconditional variablesdata_ready,现在代码可以正常工作了。我认为使用 pointer to mutex 等存在错误。代码如下:

//#define SYNC_WITH_BARRIER
#define SYNC_WITH_ATOMICS

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>
#include <boost/ptr_container/ptr_vector.hpp>

std::vector<boost::thread*> threads;

boost::atomic<int> next_thread_id(0);

boost::mutex finished_task_mutex;
boost::condition_variable finished_task_cond;
bool finished_task = false;

boost::atomic<int> num_finished_tasks(0);

struct Work
{
  Work(boost::barrier& _barrier) : b(&_barrier)
  {

  }

  boost::barrier*           b;
  boost::mutex              data_ready_mutex;
  boost::condition_variable data_ready_cond;
  bool                      data_ready;
  int                       num_run;
  boost::atomic<int>        data;
  bool                      finished;

  void signal_slave()
  {
    {
      boost::lock_guard<boost::mutex> lock(data_ready_mutex);
      data_ready = true;
      data_ready_cond.notify_all();
    }    
  }

  void slave_therad()
  {
    int id = next_thread_id++;

    std::cout << "( " << id << " ) slave_thread created\n";
    while (true)
    {
      boost::unique_lock<boost::mutex> lock(data_ready_mutex);
      while (!data_ready)
      {
        data_ready_cond.wait(lock);
      }

      finished = false;

      data_ready = false;

      data++;

      num_run++;

#ifdef SYNC_WITH_BARRIER
      b->count_down_and_wait();
#else 
#ifdef SYNC_WITH_ATOMICS
      num_finished_tasks++;
#endif
#endif
    }
  }

};

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

void main()
{
  size_t nThreads = 10;

  boost::thread_group tg;
  boost::ptr_vector<Work> work_items;
  work_items.reserve(nThreads);

  boost::barrier finish(nThreads + 1); // one for the main thread

  for (size_t i = 0; i < nThreads; i++)
  {
    work_items.push_back(new Work(finish));
    tg.create_thread(boost::bind(&Work::slave_therad, boost::ref(work_items.back()))); 
  }

  while (true)
  {
    auto start_time = hrc::now();

    num_finished_tasks = 0;

    for (size_t i = 0; i < work_items.size(); i++)
      work_items[i].signal_slave();

#ifdef SYNC_WITH_BARRIER
    finish.count_down_and_wait();
#else
#ifdef SYNC_WITH_ATOMICS
    while (true) if (num_finished_tasks == work_items.size()) break;
#endif
#endif

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << hrc::now() - start_time << std::endl;

  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

@sehe even with barrier, it stuck in deadlock. – mmostajab

既然你没有展示你在那里做什么,让我通过整合你收到的所有建议中的大部分来给你一个启动提升:

Live On Coliru

#include <boost/atomic.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>

namespace /*static*/ {
    boost::atomic<int> data;
    boost::atomic<int> num_threads_done;

    struct Work {
        void signal_slave()
        {
            boost::lock_guard<boost::mutex> lock(data_ready_mutex);
            data_ready = true;
            cond.notify_all();
        }

        void slave_thread()
        {
            static boost::atomic_int _id_gen(0);
            id = _id_gen++;

            std::cout << "(" << id << ") slave_thread created\n";
            while (true) {

                boost::unique_lock<boost::mutex> lock(data_ready_mutex);
                cond.wait(lock, [&]{ return data_ready; });

                data_ready = false;

                data++;

                num_run++;

                num_threads_done++;
            }
        }

      private:
        int id          = 0;
        bool data_ready = false;
        int  num_run    = 0;

        boost::mutex data_ready_mutex;
        boost::condition_variable cond;

    };

}

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

int main()
{
    boost::thread_group tg;

    size_t nThreads = 10;

    std::vector<Work> works(nThreads);

    for (size_t i = 0; i < nThreads; i++) {
        tg.create_thread(boost::bind(&Work::slave_thread, boost::ref(works[i])));
    }

    while (true) {
        auto start_time = hrc::now();

        for (auto& w : works)
            w.signal_slave();

        std::cout << "Elapsed Time = " << (hrc::now()-start_time) << std::endl;
    }

    tg.join_all();
}

记住,我不知道你想在这里达到什么目的。添加障碍我想到了这一点:how to use boost barrier

我试图更改@sehe 的答案,所以它正好解决了我正在寻找的问题,我实现了这个代码:

#include <boost/atomic.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>

namespace /*static*/ {
  boost::atomic<int> data;

  boost::barrier*           slave_thread_finished_barrier;
  boost::mutex              slave_thread_finished_mutex; 
  boost::condition_variable slave_thread_finished_cond;
  bool                      slave_thread_finished = false;

  struct Work {
    void signal_slave()
    {
      boost::lock_guard<boost::mutex> lock(data_ready_mutex);
      data_ready = true;
      cond.notify_all();
    }

    void slave_thread()
    {
      static boost::atomic_int _id_gen(0);
      id = _id_gen++;

      std::cout << "(" << id << ") slave_thread created\n";
      while (true) {

        boost::unique_lock<boost::mutex> lock(data_ready_mutex);
        cond.wait(lock, [&]{ return data_ready; });

        data_ready = false;

        data++;

        num_run++;

        slave_thread_finished_barrier->wait();

        // signaling the main thread that the slave threads are done.
        if (id == 0)
        {
          boost::lock_guard<boost::mutex> lock(slave_thread_finished_mutex);
          slave_thread_finished = true;
          slave_thread_finished_cond.notify_one();
        }

      }
    }

  private:
    int id = 0;
    bool data_ready = false;
    int  num_run = 0;

    boost::mutex data_ready_mutex;
    boost::condition_variable cond;

  };

}

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

int main()
{
  boost::thread_group tg;

  size_t nThreads = 10;

  slave_thread_finished_barrier = new boost::barrier(nThreads);
  std::vector<Work> works(nThreads);

  for (size_t i = 0; i < nThreads; i++) {
    tg.create_thread(boost::bind(&Work::slave_thread, boost::ref(works[i])));
  }

  while (true) {
    auto start_time = hrc::now();

    for (auto& w : works)
      w.signal_slave();

    // Wait for slave threads to finish.
    boost::unique_lock<boost::mutex> lock(slave_thread_finished_mutex);
    slave_thread_finished_cond.wait(lock, [&]{ return slave_thread_finished; });
    slave_thread_finished = false;

    std::cout << "Elapsed Time = " << (hrc::now() - start_time) << std::endl;
  }

  tg.join_all();
}