多生产者消费者程序中不规则的打印语句
Irregular print statements in multiple producer consumer program
共享队列是一个。生产者和消费者各有两个。
下面是输出结果,我把之后的程序贴出来了:
我的问题是打印语句 qDebug () << "\nConsumer: " << tId;
位于顶部,但它仍然没有首先打印。我想明白为什么。
Producer 140588830992128 couldn't push any data since queue was already full. Length of queue is: 10
Removed by thread Consumer: 140588814206720 , Length of queue is: 9
Removed by thread Consumer: 140588814206720 , Length of queue is: 8
Removed by thread Consumer: 140588814206720 , Length of queue is: 7
Removed by thread Consumer: 140588814206720 , Length of queue is: 6
Removed by thread Consumer: 140588814206720 , Length of queue is: 5
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 4
Removed by thread Consumer: 140588814206720 , Length of queue is: 3
Removed by thread Consumer: 140588814206720 , Length of queue is: 2
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 1
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 0
这是我写的程序:
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueue;
/*
* `sharedQueue`'s size is assumed to be 10 ATM.
* `sharedQueue` is supposed to be shared among two threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
// This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
pthread_t tId = pthread_self();
qDebug () << "\nProducer: " << tId;
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer " << tId << ": " << "Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer " << tId << " has no work to do since quque is full, and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
pthread_t tId = pthread_self ();
qDebug () << "\nConsumer: " << tId;
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
for (int u = 0; u < sharedQueue.length (); u++) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
}
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
int main (int argc, char *argv[]) {
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// Producer threads creation
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer B\n");
return 1;
}
// Consumer threads creation
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
// Joining every thread
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}
添加了 std::cerr
的屏幕截图。输出无差异:
在我的系统上,我确实看到了您期望的输出,从以下内容开始:
Producer: 0x700000081000
Consumer: 0x700000187000
Producer: 0x700000104000
Consumer: 0x70000020a000
但这只是运气,因为似乎不能保证qDebug() 是线程安全的。一种解决方案可能是仅在持有互斥锁时进行日志记录。或者使用不同的日志记录工具。请注意,写入 std::cerr
是线程安全的,但如果您使用 <<
写入多个标记,则可能最终会出现交错。所以你可以先格式化你的日志行,然后通过 std::cerr.write()
.
将它们写成 "atomically"
共享队列是一个。生产者和消费者各有两个。
下面是输出结果,我把之后的程序贴出来了:
我的问题是打印语句 qDebug () << "\nConsumer: " << tId;
位于顶部,但它仍然没有首先打印。我想明白为什么。
Producer 140588830992128 couldn't push any data since queue was already full. Length of queue is: 10
Removed by thread Consumer: 140588814206720 , Length of queue is: 9
Removed by thread Consumer: 140588814206720 , Length of queue is: 8
Removed by thread Consumer: 140588814206720 , Length of queue is: 7
Removed by thread Consumer: 140588814206720 , Length of queue is: 6
Removed by thread Consumer: 140588814206720 , Length of queue is: 5
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 4
Removed by thread Consumer: 140588814206720 , Length of queue is: 3
Removed by thread Consumer: 140588814206720 , Length of queue is: 2
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 1
Consumer: 140588814206720
Removed by thread Consumer: 140588814206720 , Length of queue is: 0
这是我写的程序:
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueue;
/*
* `sharedQueue`'s size is assumed to be 10 ATM.
* `sharedQueue` is supposed to be shared among two threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
// This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
pthread_t tId = pthread_self();
qDebug () << "\nProducer: " << tId;
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer " << tId << ": " << "Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer " << tId << " has no work to do since quque is full, and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
pthread_t tId = pthread_self ();
qDebug () << "\nConsumer: " << tId;
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
for (int u = 0; u < sharedQueue.length (); u++) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
}
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
int main (int argc, char *argv[]) {
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// Producer threads creation
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer B\n");
return 1;
}
// Consumer threads creation
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
// Joining every thread
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}
添加了 std::cerr
的屏幕截图。输出无差异:
在我的系统上,我确实看到了您期望的输出,从以下内容开始:
Producer: 0x700000081000
Consumer: 0x700000187000
Producer: 0x700000104000
Consumer: 0x70000020a000
但这只是运气,因为似乎不能保证qDebug() 是线程安全的。一种解决方案可能是仅在持有互斥锁时进行日志记录。或者使用不同的日志记录工具。请注意,写入 std::cerr
是线程安全的,但如果您使用 <<
写入多个标记,则可能最终会出现交错。所以你可以先格式化你的日志行,然后通过 std::cerr.write()
.