正确使用std::condition_variable触发定时执行
Correct usage of std::condition_variable to trigger timed execution
我正在尝试以固定的时间间隔执行一段代码。我有一些基于 naked pthread
的东西,现在我想使用 std::thread
做同样的事情。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
bool running;
std::mutex mutex;
std::condition_variable cond;
void timer(){
while(running) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::lock_guard<std::mutex> guard(mutex);
cond.notify_one();
}
cond.notify_one();
}
void worker(){
while(running){
std::unique_lock<std::mutex> mlock(mutex);
cond.wait(mlock);
std::cout << "Hello World" << std::endl;
//... do something that takes a variable amount of time ...//
}
}
int main(){
running = true;
auto t_work = std::thread(worker);
auto t_time = std::thread(timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
running = false;
t_time.join();
t_work.join();
}
worker
在现实中做一些需要可变时间的事情,但应该以固定的时间间隔安排。它似乎有效,但我对此很陌生,所以有些事情我不清楚...
为什么我需要 mutex
?我并没有真正使用条件,但是只要 timer
发送信号,工作人员就应该完成它的工作。
timer
真的需要在循环后再次调用cond.notify_one()
吗?这是从旧代码中获取的,iirc 的原因是为了防止 worker
永远等待,以防 timer
在 worker
仍在等待时完成。
我需要 running
标志,还是有更好的方法 break
跳出循环?
PS:我知道还有其他方法可以确保固定的时间间隔,而且我知道我目前的方法存在一些问题(例如,如果 worker
需要比timer
使用的间隔)。但是,我想先了解那段代码,然后再对其进行过多更改。
Why do I need a mutex at all? I do not really use a condition, but whenever the timer sends a signal, the worker should do its job.
您需要互斥锁的原因是等待条件满足的线程可能会被虚假唤醒。为了确保您的线程确实收到了条件已正确满足的通知,您需要检查并应该在 wait 调用中使用 lambda 进行检查。并且为了保证变量在虚假唤醒之后不被修改,但在您检查变量之前,您需要获取一个互斥锁,这样您的线程是唯一可以修改条件的线程。在您的情况下,这意味着您需要为工作线程添加一种方法来实际验证计时器是否 运行 结束。
Does the timer really need to call cond.notify_one() again after the loop? This was taken from the older code and iirc the reasoning is to prevent the worker to wait forever, in case the timer finishes while the worker is still waiting.
如果在循环后不调用通知,工作线程将无限期地等待。因此,要干净地退出程序,您实际上应该调用 notify_all() 以确保每个等待条件变量的线程都被唤醒并可以干净地终止。
Do I need the running flag, or is there a nicer way to break out of the loops?
一个 运行ning 标志是完成你想要的最干净的方法。
我们先来了解一下背景概念。
关键部分
首先,需要 Mutex 来相互排除对 critical section. Usually, critical section is considered to be shared resource. E.g. a Queue, Some I/O (e.g. socket) etc. In plain words Mutex is used to guard shared resource agains a Race Condition 的访问,这 可以 使资源进入未定义状态。
示例:生产者/消费者问题
队列应该包含一些要完成的工作项目。可能有多个线程将一些工作项目放入队列(即生产项目 => 生产者线程)和多个线程使用这些项目并执行 smth。对它们很有用(=> 消费者线程)。
Put 和 Consume 操作修改 Queue(尤其是其存储和内部表示)。因此,当 运行ning 放置或使用操作时,我们希望排除其他操作执行相同的操作。这就是 Mutex 发挥作用的地方。在一个非常基本的配置中,只有一个线程(无论是生产者还是消费者)可以访问互斥体,即锁定它。存在一些其他高级锁定原语以根据使用场景增加吞吐量(例如 ReaderWriter Locks)
条件变量的概念
condition_variable::notify_one
唤醒一个当前等待的线程。至少有一个线程必须等待这个变量:
- 如果没有线程正在等待此变量,发布的事件将会丢失。
- 如果有一个等待线程,它将唤醒并开始 运行ning,只要它可以锁定与条件变量关联的互斥量。因此,如果启动
notify_one
或 notify_all
调用的线程没有放弃互斥锁(例如 mutex::unlock()
或 condition_variable::wait()
),则唤醒线程将不会 运行.
在timer()
线程互斥量在notify_one()
调用后被解锁,因为作用域结束并且guard
对象被销毁(析构函数隐式调用mutex::unlock()
)
这种方法的问题
取消和变量缓存
允许编译器缓存变量的值。因此,将 running
设置为 true
可能不起作用,因为变量的值可能会被缓存。为避免这种情况,您需要将 running
声明为 volatile
或 std::atomic<bool>
.
worker
话题
您指出 worker
需要在某些时间间隔内 运行 并且可能 运行 的时间长短不一。 timer
线程 只能 在 worker
线程完成后 运行。为什么此时需要另一个线程来测量时间?这两个线程总是 运行 作为一个线性块并且没有 critical 部分! 为什么不在任务执行之后放置所需的 sleep
调用并在时间结束后立即开始 运行ning? 结果只有 std::cout
是共享资源。但目前它是从一个线程使用的。否则,你需要一个互斥体(没有条件变量)来保护对 cout
的写入。
#include <thread>
#include <atomic>
#include <iostream>
#include <chrono>
std::atomic_bool running = false;
void worker(){
while(running){
auto start_point = std::chrono::system_clock::now();
std::cout << "Hello World" << std::endl;
//... do something that takes a variable amount of time ...//
std::this_thread::sleep_until(start_point+std::chrono::milliseconds(1000));
}
}
int main(){
running = true;
auto t_work = std::thread(worker);
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
running = false;
t_work.join();
}
注意: 在工作线程中调用 sleep_until
如果您的任务从 start_point
阻塞的时间超过 1000 毫秒,则执行将被阻塞。
我正在尝试以固定的时间间隔执行一段代码。我有一些基于 naked pthread
的东西,现在我想使用 std::thread
做同样的事情。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
bool running;
std::mutex mutex;
std::condition_variable cond;
void timer(){
while(running) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::lock_guard<std::mutex> guard(mutex);
cond.notify_one();
}
cond.notify_one();
}
void worker(){
while(running){
std::unique_lock<std::mutex> mlock(mutex);
cond.wait(mlock);
std::cout << "Hello World" << std::endl;
//... do something that takes a variable amount of time ...//
}
}
int main(){
running = true;
auto t_work = std::thread(worker);
auto t_time = std::thread(timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
running = false;
t_time.join();
t_work.join();
}
worker
在现实中做一些需要可变时间的事情,但应该以固定的时间间隔安排。它似乎有效,但我对此很陌生,所以有些事情我不清楚...
为什么我需要
mutex
?我并没有真正使用条件,但是只要timer
发送信号,工作人员就应该完成它的工作。timer
真的需要在循环后再次调用cond.notify_one()
吗?这是从旧代码中获取的,iirc 的原因是为了防止worker
永远等待,以防timer
在worker
仍在等待时完成。我需要
running
标志,还是有更好的方法break
跳出循环?
PS:我知道还有其他方法可以确保固定的时间间隔,而且我知道我目前的方法存在一些问题(例如,如果 worker
需要比timer
使用的间隔)。但是,我想先了解那段代码,然后再对其进行过多更改。
Why do I need a mutex at all? I do not really use a condition, but whenever the timer sends a signal, the worker should do its job.
您需要互斥锁的原因是等待条件满足的线程可能会被虚假唤醒。为了确保您的线程确实收到了条件已正确满足的通知,您需要检查并应该在 wait 调用中使用 lambda 进行检查。并且为了保证变量在虚假唤醒之后不被修改,但在您检查变量之前,您需要获取一个互斥锁,这样您的线程是唯一可以修改条件的线程。在您的情况下,这意味着您需要为工作线程添加一种方法来实际验证计时器是否 运行 结束。
Does the timer really need to call cond.notify_one() again after the loop? This was taken from the older code and iirc the reasoning is to prevent the worker to wait forever, in case the timer finishes while the worker is still waiting.
如果在循环后不调用通知,工作线程将无限期地等待。因此,要干净地退出程序,您实际上应该调用 notify_all() 以确保每个等待条件变量的线程都被唤醒并可以干净地终止。
Do I need the running flag, or is there a nicer way to break out of the loops?
一个 运行ning 标志是完成你想要的最干净的方法。
我们先来了解一下背景概念。
关键部分
首先,需要 Mutex 来相互排除对 critical section. Usually, critical section is considered to be shared resource. E.g. a Queue, Some I/O (e.g. socket) etc. In plain words Mutex is used to guard shared resource agains a Race Condition 的访问,这 可以 使资源进入未定义状态。
示例:生产者/消费者问题
队列应该包含一些要完成的工作项目。可能有多个线程将一些工作项目放入队列(即生产项目 => 生产者线程)和多个线程使用这些项目并执行 smth。对它们很有用(=> 消费者线程)。
Put 和 Consume 操作修改 Queue(尤其是其存储和内部表示)。因此,当 运行ning 放置或使用操作时,我们希望排除其他操作执行相同的操作。这就是 Mutex 发挥作用的地方。在一个非常基本的配置中,只有一个线程(无论是生产者还是消费者)可以访问互斥体,即锁定它。存在一些其他高级锁定原语以根据使用场景增加吞吐量(例如 ReaderWriter Locks)
条件变量的概念
condition_variable::notify_one
唤醒一个当前等待的线程。至少有一个线程必须等待这个变量:
- 如果没有线程正在等待此变量,发布的事件将会丢失。
- 如果有一个等待线程,它将唤醒并开始 运行ning,只要它可以锁定与条件变量关联的互斥量。因此,如果启动
notify_one
或notify_all
调用的线程没有放弃互斥锁(例如mutex::unlock()
或condition_variable::wait()
),则唤醒线程将不会 运行.
在timer()
线程互斥量在notify_one()
调用后被解锁,因为作用域结束并且guard
对象被销毁(析构函数隐式调用mutex::unlock()
)
这种方法的问题
取消和变量缓存
允许编译器缓存变量的值。因此,将 running
设置为 true
可能不起作用,因为变量的值可能会被缓存。为避免这种情况,您需要将 running
声明为 volatile
或 std::atomic<bool>
.
worker
话题
您指出 worker
需要在某些时间间隔内 运行 并且可能 运行 的时间长短不一。 timer
线程 只能 在 worker
线程完成后 运行。为什么此时需要另一个线程来测量时间?这两个线程总是 运行 作为一个线性块并且没有 critical 部分! 为什么不在任务执行之后放置所需的 sleep
调用并在时间结束后立即开始 运行ning? 结果只有 std::cout
是共享资源。但目前它是从一个线程使用的。否则,你需要一个互斥体(没有条件变量)来保护对 cout
的写入。
#include <thread>
#include <atomic>
#include <iostream>
#include <chrono>
std::atomic_bool running = false;
void worker(){
while(running){
auto start_point = std::chrono::system_clock::now();
std::cout << "Hello World" << std::endl;
//... do something that takes a variable amount of time ...//
std::this_thread::sleep_until(start_point+std::chrono::milliseconds(1000));
}
}
int main(){
running = true;
auto t_work = std::thread(worker);
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
running = false;
t_work.join();
}
注意: 在工作线程中调用 sleep_until
如果您的任务从 start_point
阻塞的时间超过 1000 毫秒,则执行将被阻塞。