Boost线程条件变量三线程

Boost thread condition variable three thread

我有一个 main 设置两个变量的值并等待两个线程打印该值。然后更新值等等...

对于主线程和一个线程,它可以工作,但对于两个则不行。 这是我的代码:

void t(int id){

    bool block_me = true;
    while(1)
    {
        {
            boost::mutex::scoped_lock lock(m);
            while(!start_t || thread_executed > 0)
                start_thread.wait(lock);
        }

        // Print myself
        cout<<id<<endl;

        {
            boost::mutex::scoped_lock lock(m);
            thread_executed++;
            if(thread_executed == 2){
                start_main.notify_one();
            }
        }       
    }
}

int main(){

    thread_executed = 0;
    start_t = false;

    boost::thread t1(boost::bind(&t, 1));
    boost::thread t2(boost::bind(&t, 2));

    for(int i = 1; i < 10; i++){
        cout<<"i = "<<i<<endl;
        {
            boost::mutex::scoped_lock lock(m);  
            start_t = true;
            thread_executed = 0;
            start_thread.notify_all();  

            while(thread_executed != 2){
                start_main.wait(lock);
            }
            start_t = false;
            thread_executed = 0;
            start_thread.notify_all();
        }       
    }
    return 0;
}

这里最有可能发生的是,第一个线程运行改变了变量"published",然后坐着等待,第二个线程只是坐着等待发布0,但情况永远不会是这样,因为主线程正在等待它变为 2 以将其更改回 0。

要以有效的方式实施它,可以做的事情很少:

  1. 让线程将你需要打印的东西入队,然后另一个线程会从队列中取出东西并打印它们(或者将它们写入磁盘,或者任何你真正需要做的事情)他们)。这是一种非常常见的模式,可以简化整体实施。可能会有约束,然后你可能无法实现它。

  2. 将 "states" 添加到您的线程,这样当您处于 "PRINT" 状态时,当您处于 [=40] 状态时,线程将被唤醒并打印值=] 声明线程将在条件变量中等待,主线程将 "process" 值。您可以使用 boost barriers 等到所有互斥锁都完成工作后再更改状态。

几点建议:

  • 如果您在线程中读取它们,请不要在互斥保护之外更改变量,变量 "published" 和 "termiante_thread" 当您将它们更改为例如停止线程。

  • 不要使用睡眠,你不需要它们来实现一个工作示例,它们是实施中出现问题的症状。

I've modified my original code... mmm, queue for??

我们来演示一下!

我概括了一点,因为您基本上有两个具有共享条件变量的单元素队列。

更容易推理的是两个具有不同条件和锁的独立队列。这会立即解开它们以进行同步,如果您将容量定义为 > 1,则在主线程需要放慢速度之前,工作人员可能会积压一些排队的项目。

Live On Coliru

#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>

static constexpr size_t queue_capacity = 1;

struct Processor {

    Processor(int id) : id_(id) {}

    void work() {
        while (running) {
            int value;
            {   // pop under lock
                std::unique_lock<std::mutex> lk(mx_);
                cv_.wait(lk, [this] { return !running || !queue_.empty(); });

                if (!running)
                    break;

                // invariant: queue cannot be empty here
                value = queue_.front();
                queue_.pop();
                cv_.notify_one();
            }
            std::cout << "work " << id_ << ": " << value << "\n";
        }
    }

    void enqueue(int value) {
        std::unique_lock<std::mutex> lk(mx_);
        cv_.wait(lk, [this] { return !running || queue_.size() < queue_capacity; });

        if (running) {
            queue_.push(value);
            cv_.notify_one();
        }
    }

    ~Processor() {
        {
            std::unique_lock<std::mutex> lk(mx_);
            cv_.notify_one();
            running = false;
        }
        if (th_.joinable())
            th_.join();
    }
private:
    bool running = true;
    std::mutex mx_;
    std::condition_variable cv_;
    std::thread th_ {std::bind(&Processor::work, this)};
    int id_;

    std::queue<int> queue_;
};

int main() {
    Processor w1(1), w2(2);

    for (int i = 1; i < 10; ++i)
    {
        w1.enqueue(i*10);
        w2.enqueue(i*20);

        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }

    std::this_thread::sleep_for(std::chrono::seconds(4));
}

打印,例如

work work 1: 10
2: 20
work work 2: 40
1: 20
work 2: 60
work 1: 30
work 2: 80
work 1: 40
work 2: 100
work 1: 50
work 2: 120
work 1: 60
work 2: 140
work 1: 70
work 2: 160
work 1: 80
work 2: 180
work 1: 90