'std::system_error',生产者消费者C++并发

'std::system_error', producer consumer C++ concurrency

下面的Producer/Consumer程序应该一次将一个字符传输到缓冲区,然后打印出来。该程序最初运行,但随后总是在消费者循环的第三次迭代中失败。 -pthread 在编译时包含。

程序应该能够遍历 'poem' 变量,最终导致整首诗逐字打印出来。

#include <iostream>
#include <cmath>
#include <cstdlib>
#include <thread>
#include <fstream>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>

using namespace std;

mutex mtx;
condition_variable cond_var;
int thread_ID = 1;

vector<char> txtImport(vector<char> output){
  ifstream file("poem.txt");
  char character;
  while (file.get(character)){
    output.push_back(character);
  }
  return output;
}

void producer(vector<char> poem, char* buffer, int poem_size) {
  cout << "\n\nPROD-  \n";
  int poem_position = 68;
  const int producer_ID = 1;
  unique_lock<mutex> lock(mtx);
  while(true){
    cout << "\n1";
    if(poem_position == poem_size) {exit(1);}
    if(thread_ID!=producer_ID) {cond_var.wait(lock);}
    else{
      cout << "\nlock\n";

      *buffer = poem[poem_position];
      poem_position += 1;
      thread_ID = 2;

      cout << poem_position << " 2 \n";
      cout << poem[poem_position] << " 4 \n";
      cout << "CHAR: " << *buffer << " \n------\n\n";

      lock.unlock();
      cond_var.notify_all();
      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
  }
}

void consumer(char* buffer){
  cout << "\n\n       -CONS\n";
  const int consumer_ID = 2;
  unique_lock<mutex> lock(mtx);
  while(true){
    cout << "\n       one ";
    if(thread_ID!=consumer_ID) {cond_var.wait(lock);}
    else{
      cout << "\n       lock\n";
      cout << "       ->" << *buffer << " <-\n\n";
      thread_ID = 1;
      lock.unlock();
      cond_var.notify_all();
      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
  }
}

int main(void) {

    vector<char> poem;
    poem = txtImport(poem);
    char buffer = 'z';
    int poem_size = poem.size();

    cout << "\n--MAIN--\n";

    thread thread_two(consumer, &buffer);
    thread thread_one(producer, poem, &buffer, poem_size);


    thread_one.join();
    thread_two.join();

    return 0;

}

程序输出为:

--MAIN--


       -CONS

       one 

PROD-  

1
lock
69 2 
b 4 
CHAR: m 
------


       one 
       lock
       ->m <-


1
lock
70 2 
e 4 
CHAR: b 
------


       one 
       lock
       ->b <-

terminate called after throwing an instance of 'std::system_error'
terminate called recursively
  what():  Aborted (core dumped)

您有未定义的行为。

调用cond_var.wait时,传递的互斥锁必须被当前线程锁定。它不会发生在您的代码中(第一次迭代除外)。

您的案例:

unique_lock<mutex> lock(mtx);
while (true)
   cond_var.wait(lock);
   lock.unlock();

所以在 while 循环的第二次迭代中,当 wait 被调用时 lock 被解锁。错了。

要修复,请将 unique_lock 构造移到 both 中(consumer/producer 函数中的相同问题)while循环。


lock - an object of type std::unique_lock, which must be locked by the current thread

来自 condition_variable::wait reference