多生产者消费者程序中不规则的打印语句

Irregular print statements in multiple producer consumer program

共享队列是一个。生产者和消费者各有两个。

下面是输出结果,我把之后的程序贴出来了:

我的问题是打印语句 qDebug () << "\nConsumer: " << tId; 位于顶部,但它仍然没有首先打印。我想明白为什么。


Producer  140588830992128  couldn't push any data since queue was already full. Length of queue is:  10
Removed by thread Consumer:  140588814206720 , Length of queue is:  9
Removed by thread Consumer:  140588814206720 , Length of queue is:  8
Removed by thread Consumer:  140588814206720 , Length of queue is:  7
Removed by thread Consumer:  140588814206720 , Length of queue is:  6
Removed by thread Consumer:  140588814206720 , Length of queue is:  5
Consumer:  140588814206720
Removed by thread Consumer:  140588814206720 , Length of queue is:  4
Removed by thread Consumer:  140588814206720 , Length of queue is:  3
Removed by thread Consumer:  140588814206720 , Length of queue is:  2
Consumer:  140588814206720
Removed by thread Consumer:  140588814206720 , Length of queue is:  1
Consumer:  140588814206720
Removed by thread Consumer:  140588814206720 , Length of queue is:  0

这是我写的程序:

#include "mainwindow.h"
#include <QApplication>

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>

pthread_mutex_t mutexVariable     = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  conditionVariable = PTHREAD_COND_INITIALIZER;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList <int> sharedQueue;
/*
 * `sharedQueue`'s size is assumed to be 10 ATM.
 * `sharedQueue` is supposed to be shared among two threads.
 * Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
 * Assumption: `sharedQueue` can contain only 10 elements at a time.
 */

int sizeOfSharedQueue;

//  This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        pthread_t tId = pthread_self();
        qDebug () << "\nProducer: " << tId;

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () < 10) {
            sharedQueue.push_back (1);
            qDebug () << "\nPushed by Producer " << tId << ": " << "Length of queue is: " << sharedQueue.length ();
        }
        else {
            qDebug () << "\nProducer " << tId << " has no work to do since quque is full, and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        }

        pthread_mutex_unlock (&mutexVariable);
    }

    return NULL;
}

//  This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg) {
    Q_UNUSED (arg);

    while (1) {
        pthread_t tId = pthread_self ();
        qDebug () << "\nConsumer: " << tId;

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () > 0) {
            for (int u = 0; u < sharedQueue.length (); u++) {
                sharedQueue.pop_front ();
                qDebug () << "\nRemoved by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
            }
        }
        else {
            pthread_cond_signal (&conditionVariable);
            qDebug () << "\nSignal issued by thread Consumer: " << tId << ", Length of queue is: " << sharedQueue.length ();
        }

        pthread_mutex_unlock (&mutexVariable);
    }   
    return NULL;
}

int main (int argc, char *argv[]) {
    numberOfActiveProducers = 2;
    numberOfActiveConsumers = 2;
    sizeOfSharedQueue       = 10;

    // Producer threads creation
    pthread_t producerA;
    pthread_t producerB;

    if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    }

    if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer B\n");
        return 1;
    }

    // Consumer threads creation
    pthread_t consumerA;
    pthread_t consumerB;

    if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer A\n");
        return 1;
    }

    if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer B\n");
        return 1;
    }

    // Joining every thread
    if (pthread_join (producerA, NULL)) {
        fprintf (stderr, "Error joining thread Producer A\n");
        return 2;
    }

    if (pthread_join (producerB, NULL)) {
        fprintf (stderr, "Error joining thread Producer B\n");
        return 2;
    }

    if (pthread_join (consumerB, NULL)) {
        fprintf (stderr, "Error joining thread Consumer B\n");
        return 2;
    }

    if (pthread_join (consumerA, NULL)) {
        fprintf (stderr, "Error joining thread Consumer A\n");
        return 2;
    }

    QApplication a (argc, argv);
    MainWindow w;
    w.show ();

    return a.exec ();
}

添加了 std::cerr 的屏幕截图。输出无差异:

在我的系统上,我确实看到了您期望的输出,从以下内容开始:

Producer:  0x700000081000 
Consumer:  0x700000187000 
Producer:  0x700000104000 
Consumer:  0x70000020a000 

但这只是运气,因为似乎不能保证qDebug() 是线程安全的。一种解决方案可能是仅在持有互斥锁时进行日志记录。或者使用不同的日志记录工具。请注意,写入 std::cerr 是线程安全的,但如果您使用 << 写入多个标记,则可能最终会出现交错。所以你可以先格式化你的日志行,然后通过 std::cerr.write().

将它们写成 "atomically"