C++ - 多线程 - 线程之间的通信
C++ - Multi-threading - Communication between threads
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
using namespace std;
//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;
void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone){
while(!workdone) {
unique_lock<std::mutex> lk(m);
int rndNum = rand() % 100;
numbers.push(rndNum);
producer_count++;
cv.notify_one();
}
}
void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) {
while(!workdone) {
unique_lock<std::mutex> lk(m);
cv.wait(lk);
cout << numbers.front() << endl;
numbers.pop();
consumer_count++;
}
}
int main() {
condition_variable cv;
mutex m;
bool workdone = false;
queue<int> numbers;
//start threads
thread producer(generateNumbers, ref(numbers), ref(cv), ref(m), ref(workdone));
thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));
//wait for 3 seconds, then join the threads
this_thread::sleep_for(std::chrono::seconds(3));
workdone = true;
producer.join();
consumer.join();
//output the counters
cout << producer_count << endl;
cout << consumer_count << endl;
return 0;
}
大家好,
我尝试用 C++ 实现生产者-消费者模式。
生产者线程生成随机整数,将它们添加到队列中,然后通知消费者线程添加了新数字。
消费者线程等待通知,然后将队列的第一个元素打印到控制台并删除。
我为每个添加到队列中的号码增加了一个计数器,为每个从队列中取出的号码增加了另一个计数器。
我希望程序完成后两个计数器保持相同的值,但是差异很大。
表示添加到队列中的计数器始终在百万范围内(在我上次测试中为 3871876)并且表示从队列中取出数字的消费者的计数器始终低于 100k(在我上次测试中为 89993)。
有人能给我解释一下为什么会有这么大的差异吗?
我是否必须添加另一个条件变量,以便生产者线程也等待消费者线程?
谢谢!
请注意,此代码可能无法正常工作。
workdone 变量定义为常规布尔值
编译器认为它可以安全地优化掉是完全合法的,因为它在代码块内永远不会改变。
如果您对添加 volatile 反应不快……不,那也行不通。
您需要正确同步对 workdone 变量的访问,因为两个线程都在读取,而另一个线程(ui 线程)正在写入。
另一种解决方案是使用事件之类的东西而不是简单的变量。
但是对你的问题的解释。
两个线程具有相同的结束条件 (!workdone),但它们具有不同的持续时间,因此目前无法保证生产者和消费者随着时间的推移以相似的循环次数以某种方式同步到 运行。
不需要一秒钟 std::condition_variable
,只需重复使用已有的即可。正如其他人所提到的,您应该考虑使用 std::atomic<bool>
而不是普通的 bool
。但我必须承认,带有 -O3 的 g++ 并没有优化它。
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>
//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;
void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load())
{
std::unique_lock<std::mutex> lk(m);
int rndNum = rand() % 100;
numbers.push(rndNum);
producer_count++;
cv.notify_one(); // Notify worker
cv.wait(lk); // Wait for worker to complete
}
}
void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load())
{
std::unique_lock<std::mutex> lk(m);
cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
cv.wait(lk); // Wait for the generator to complete
std::cout << numbers.front() << std::endl;
numbers.pop();
consumer_count++;
}
}
int main() {
std::condition_variable cv;
std::mutex m;
std::atomic<bool> workdone(false);
std::queue<int> numbers;
//start threads
std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
//wait for 3 seconds, then join the threads
std::this_thread::sleep_for(std::chrono::seconds(3));
workdone = true;
cv.notify_all(); // To prevent dead-lock
producer.join();
consumer.join();
//output the counters
std::cout << producer_count << std::endl;
std::cout << consumer_count << std::endl;
return 0;
}
编辑:
为了避免偶发的差一错误,你可以使用这个:
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>
//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;
void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load())
{
std::unique_lock<std::mutex> lk(m);
int rndNum = rand() % 100;
numbers.push(rndNum);
producer_count++;
cv.notify_one(); // Notify worker
cv.wait(lk); // Wait for worker to complete
}
}
void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load() or !numbers.empty())
{
std::unique_lock<std::mutex> lk(m);
cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
if (numbers.empty())
cv.wait(lk); // Wait for the generator to complete
if (numbers.empty())
continue;
std::cout << numbers.front() << std::endl;
numbers.pop();
consumer_count++;
}
}
int main() {
std::condition_variable cv;
std::mutex m;
std::atomic<bool> workdone(false);
std::queue<int> numbers;
//start threads
std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
//wait for 3 seconds, then join the threads
std::this_thread::sleep_for(std::chrono::seconds(1));
workdone = true;
cv.notify_all(); // To prevent dead-lock
producer.join();
consumer.join();
//output the counters
std::cout << producer_count << std::endl;
std::cout << consumer_count << std::endl;
return 0;
}
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
using namespace std;
//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;
void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone){
while(!workdone) {
unique_lock<std::mutex> lk(m);
int rndNum = rand() % 100;
numbers.push(rndNum);
producer_count++;
cv.notify_one();
}
}
void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) {
while(!workdone) {
unique_lock<std::mutex> lk(m);
cv.wait(lk);
cout << numbers.front() << endl;
numbers.pop();
consumer_count++;
}
}
int main() {
condition_variable cv;
mutex m;
bool workdone = false;
queue<int> numbers;
//start threads
thread producer(generateNumbers, ref(numbers), ref(cv), ref(m), ref(workdone));
thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));
//wait for 3 seconds, then join the threads
this_thread::sleep_for(std::chrono::seconds(3));
workdone = true;
producer.join();
consumer.join();
//output the counters
cout << producer_count << endl;
cout << consumer_count << endl;
return 0;
}
大家好, 我尝试用 C++ 实现生产者-消费者模式。 生产者线程生成随机整数,将它们添加到队列中,然后通知消费者线程添加了新数字。
消费者线程等待通知,然后将队列的第一个元素打印到控制台并删除。
我为每个添加到队列中的号码增加了一个计数器,为每个从队列中取出的号码增加了另一个计数器。
我希望程序完成后两个计数器保持相同的值,但是差异很大。 表示添加到队列中的计数器始终在百万范围内(在我上次测试中为 3871876)并且表示从队列中取出数字的消费者的计数器始终低于 100k(在我上次测试中为 89993)。
有人能给我解释一下为什么会有这么大的差异吗? 我是否必须添加另一个条件变量,以便生产者线程也等待消费者线程? 谢谢!
请注意,此代码可能无法正常工作。 workdone 变量定义为常规布尔值 编译器认为它可以安全地优化掉是完全合法的,因为它在代码块内永远不会改变。
如果您对添加 volatile 反应不快……不,那也行不通。 您需要正确同步对 workdone 变量的访问,因为两个线程都在读取,而另一个线程(ui 线程)正在写入。 另一种解决方案是使用事件之类的东西而不是简单的变量。
但是对你的问题的解释。 两个线程具有相同的结束条件 (!workdone),但它们具有不同的持续时间,因此目前无法保证生产者和消费者随着时间的推移以相似的循环次数以某种方式同步到 运行。
不需要一秒钟 std::condition_variable
,只需重复使用已有的即可。正如其他人所提到的,您应该考虑使用 std::atomic<bool>
而不是普通的 bool
。但我必须承认,带有 -O3 的 g++ 并没有优化它。
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>
//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;
void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load())
{
std::unique_lock<std::mutex> lk(m);
int rndNum = rand() % 100;
numbers.push(rndNum);
producer_count++;
cv.notify_one(); // Notify worker
cv.wait(lk); // Wait for worker to complete
}
}
void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load())
{
std::unique_lock<std::mutex> lk(m);
cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
cv.wait(lk); // Wait for the generator to complete
std::cout << numbers.front() << std::endl;
numbers.pop();
consumer_count++;
}
}
int main() {
std::condition_variable cv;
std::mutex m;
std::atomic<bool> workdone(false);
std::queue<int> numbers;
//start threads
std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
//wait for 3 seconds, then join the threads
std::this_thread::sleep_for(std::chrono::seconds(3));
workdone = true;
cv.notify_all(); // To prevent dead-lock
producer.join();
consumer.join();
//output the counters
std::cout << producer_count << std::endl;
std::cout << consumer_count << std::endl;
return 0;
}
编辑:
为了避免偶发的差一错误,你可以使用这个:
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>
//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;
void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load())
{
std::unique_lock<std::mutex> lk(m);
int rndNum = rand() % 100;
numbers.push(rndNum);
producer_count++;
cv.notify_one(); // Notify worker
cv.wait(lk); // Wait for worker to complete
}
}
void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
while(!workdone.load() or !numbers.empty())
{
std::unique_lock<std::mutex> lk(m);
cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
if (numbers.empty())
cv.wait(lk); // Wait for the generator to complete
if (numbers.empty())
continue;
std::cout << numbers.front() << std::endl;
numbers.pop();
consumer_count++;
}
}
int main() {
std::condition_variable cv;
std::mutex m;
std::atomic<bool> workdone(false);
std::queue<int> numbers;
//start threads
std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
//wait for 3 seconds, then join the threads
std::this_thread::sleep_for(std::chrono::seconds(1));
workdone = true;
cv.notify_all(); // To prevent dead-lock
producer.join();
consumer.join();
//output the counters
std::cout << producer_count << std::endl;
std::cout << consumer_count << std::endl;
return 0;
}