我应该在这里使用哪个同步原语?

Which synchronization primitive should I employ here?

while(1) {
     char message_buffer[SIZE];
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    puts((char *) message);
    return NULL;
}

上面的例子不是MRE但是非常简单:

我有一个主线程,它有一个不断消耗消息队列中消息的循环。收到新消息后,它会存储在本地 message_buffer 缓冲区中。然后,新线程被派生到所述新消息的"take care",因此消息缓冲区的地址被传递到handle_message,新线程随后执行。


问题

通常,2 个线程会打印相同的消息,即使我可以 100% 确定队列中的消息不相同。


我不完全确定,但我想我明白为什么会这样:

假设我将 2 条不同的消息推送到 mqueue,然后我才开始使用它们。

while 循环的第一次迭代中,消息将从队列中获取并保存到 message_buffer。将生成一个新线程并将 message_length 的地址传递给它。但是该线程可能不够快,无法在 下一条消息被消耗之前(在循环的下一次迭代中)将缓冲区的内容打印到流 ,并且 message_buffer 随后被覆盖。因此,第一个和第二个线程现在打印相同的值。


我的问题是:解决这个问题最有效的方法是什么?我是并行编程的新手,threading/pthreads 不同的同步原语让我不知所措。

互斥问题

static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

while(1) {
     char message_buffer[SIZE];
     pthread_mutex_lock(&m);
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
     pthred_mutex_unlock(&m);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    pthread_mutex_lock(&m);
    strncpy(own_buffer, (char *) message, SIZE);
    pthread_mutex_unlock(&m);
    puts(own_buffer);
    return NULL;
}

我认为我当前的互斥实现不正确,因为线程仍在接收重复的消息。主线程可以锁定互斥锁,将消息消耗到缓冲区中,解锁互斥锁,生成一个线程,但该线程仍然可能挂起,而主线程可能只是再次重写缓冲区(因为缓冲区互斥锁从未被新锁定线程),有效地使我当前的互斥锁实现无用?我该如何克服这个问题?

问题是您在保证线程已完成该内存之前就结束了包含 message_buffer 的循环。

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    /****** Can't go beyond here until thread is done with message_buffer. ******/
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    /******* Only now can the caller loop back. ******/

    puts(own_buffer);
    return NULL;
}

您可以使用信号量或类似工具。

static pthread_mutex_t mutex  = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cond   = PTHREAD_COND_INITIALIZER;
static int             copied = 0;

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    // Wait until threads is done with message_buffer.
    pthread_mutex_lock(&mutex);
    while (!copied) pthread_cond_wait(&cond, &mutex);
    copied = 0;
    pthread_mutex_unlock(&mutex);
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    // Done with caller's buffer.
    // Signal caller to continue.
    pthread_mutex_lock(&mutex);
    copied = 1;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);

    puts(own_buffer);
    return NULL;
}

(添加的块有效地执行了信号量操作。请参阅 的最后一个片段以获得更通用的实现。)

但是有一个更简单的解决方案:在创建线程之前制作副本。

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
    if (ret) { /* error handling */ }
}

void * handle_message (void * message) {
    char * own_buffer = message;
    puts(own_buffer);
    free(own_buffer);
    return NULL;
}