无法重新获取互斥量并在线程之间正确传递值

unable to reacquire mutex and pass values correctly between threads

我正在尝试实现一个代码来练习同步,因此可能不是最佳设计或方法,但目标如下


#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdbool.h>

#define WORKERS 2
#define ARRAY_ELEMENTS 100
#define MAX 1000


pthread_mutex_t mutex_bucket1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_signal = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_go = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_busy = PTHREAD_COND_INITIALIZER;

static int value = 0;
bool available = false;

void *worker_thread(void *pbucket)
{
    sleep(5);
    while(1)
    {
        unsigned int count = 0;
        int local_array[ARRAY_ELEMENTS];
        int *ptbucket = (int*)pbucket;
        setbuf(stdout, NULL);

        pthread_mutex_lock(&mutex_signal);
        printf(" --------------   \n chainging state to available \n --------- ");
        available = true;
        printf(" --------------   \n from thread sending go signal \n --------- ");
        pthread_cond_signal(&cond_go);
        pthread_mutex_unlock(&mutex_signal);


        pthread_mutex_lock(&mutex_bucket1);
        printf(" --------------   \n data part locked in thread for copying \n --------- ");
        while(count < ARRAY_ELEMENTS)
        {
            printf(" %d - \n", ptbucket[count]);  /***incorrect values***/
            local_array[count] = ptbucket[count];
            count++;
        }
        pthread_mutex_unlock(&mutex_bucket1);

        /*Never able to acquire mutex_signal and change state to not available*/   **BUG**
        pthread_mutex_lock(&mutex_signal);
        printf(" --------------   \n chainging state to not available \n --------- ");
        available = false;
        pthread_mutex_unlock(&mutex_signal);

        count = 0;

        while(count < ARRAY_ELEMENTS)
        {
            printf(" %d - \n", local_array[count]);
            count++;
        }

        printf(" --------------   \n about to sleep for 5secs \n --------- ");
        sleep(5);
    }
}

int main(void)
{
    pthread_t thread_id[WORKERS];

    unsigned int* pbucket1 = (int*) malloc(sizeof(int) * ARRAY_ELEMENTS);

    unsigned int* pbucket;

    for(int i = 0; i < WORKERS - 1; i++)
    {
        pthread_create(&thread_id[i], NULL, worker_thread, (void *) pbucket);
    }

    for(int i = 0; i < MAX; i++)
    {
        unsigned int count = 0;

        pbucket = pbucket1;

        // Make the payload ready
        pthread_mutex_lock(&mutex_bucket1);

        printf(" -------------- creating data payload --------- \n");

        while(count < ARRAY_ELEMENTS)
        {
            pbucket1[count] = i;
            i++;
            count++;
        }

        printf(" --------------   \n waiting for go signal \n --------- ");

        while(!available)
        {
            pthread_cond_wait(&cond_go, &mutex_signal);
        }

        pthread_mutex_unlock(&mutex_bucket1);

        /*I believe after we unlock variable "available" can be mutexed
          again by other thread but seems thinking is flawed */

        printf(" --------------   \n Main thread sleep for 3 seconds  \n --------- ");
        sleep(3);
    }

    for(int i = 0; i < WORKERS; i++)
    {
        pthread_join(thread_id[i], NULL);
    }

    return 0;
}

我认为你的某些想法是倒退的;等待的不应该是主上下文,应该是等待数据的工作线程...

主线程的工作应该是不断填充负载并一次唤醒一个线程来处理它。

所以这里有一些潦草的代码,我认为更明智一些:

/**
    file: answer.c
    compile: gcc -o answer answer.c -pthread
    usage: answer [numThreads] [numElements]
**/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#define STATE_WAIT    1
#define STATE_READY   2

void *routine(void*);

typedef struct _shared_t {
    pthread_mutex_t     m;
    pthread_cond_t      c;
    unsigned char       state;
    int                 *payload;
    size_t              numElements;
    pthread_t           *threads;
    size_t              numThreads;
} shared_t;

static inline void shared_init(shared_t *shared, size_t numThreads, size_t numElements) {
    memset(shared, 0, sizeof(shared_t));

    pthread_mutex_init(&shared->m, NULL);
    pthread_cond_init(&shared->c, NULL);

    shared->state = STATE_WAIT;

    shared->numThreads = numThreads;
    shared->numElements = numElements;

    {
        int it = 0;

        shared->threads = (pthread_t*) calloc(shared->numThreads, sizeof(pthread_t));

        while (it < shared->numThreads) {
            if (pthread_create(&shared->threads[it], NULL, routine, shared) != 0) {
                break;
            }
            it++;
        }
    }
}

static inline void shared_populate(shared_t *shared) {
    if (pthread_mutex_lock(&shared->m) != 0) {
        return;
    }

    shared->payload = (int*) calloc(shared->numElements, sizeof(int));  

    {
        int it = 0,
             end = shared->numElements;

        while (it < end) {
            shared->payload[it] = rand();

            it++;
        }
    }

    shared->state = STATE_READY;

    pthread_cond_signal(&shared->c);

    pthread_mutex_unlock(&shared->m);
}

static inline void shared_cleanup(shared_t *shared) {
    int it = 0,
         end = shared->numThreads;

    while (it < end) {
        pthread_join(shared->threads[it], NULL);
    }

    pthread_mutex_destroy(&shared->m);
    pthread_cond_destroy(&shared->c);

    free(shared->threads);
}

void* routine(void *arg) {
    shared_t *shared = (shared_t*) arg;
    int *payload;

    do {
        if (pthread_mutex_lock(&shared->m) != 0) {
            break;
        }

        while (shared->state == STATE_WAIT) {
            pthread_cond_wait(&shared->c, &shared->m);
        }

        payload = shared->payload;

        shared->state = STATE_WAIT;

        pthread_mutex_unlock(&shared->m);

        if (payload) {
            int it = 0,
                 end = shared->numElements;

            while (it < end) {
                printf("Thread #%ld got payload %p(%d)=%d\n", 
                    pthread_self(), payload, it, payload[it]);
                it++;
            }

            free(payload);
        }
    } while(1);

    pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
    shared_t shared;

    int numThreads = argc > 1 ? atoi(argv[1]) : 1;
    int numElements   = argc > 2 ? atoi(argv[2]) : 100;

    shared_init(&shared, numThreads, numElements);

    do {
        shared_populate(&shared);
    } while (1);

    shared_cleanup(&shared);

    return 0;
}

显然,上面的代码对错误的容忍度不高,并且不容易干净地关闭...仅供说明。

我们先来看一下main,这样我们就知道主程序的流程是怎样的:

int main(int argc, char *argv[]) {
    shared_t shared;

    int numThreads = argc > 1 ? atoi(argv[1]) : 1;
    int numElements   = argc > 2 ? atoi(argv[2]) : 100;

    shared_init(&shared, numThreads, numElements);

    do {
        shared_populate(&shared);
    } while (1);

    shared_cleanup(&shared);

    return 0;
}

它在堆栈上保留 shared_t

typedef struct _shared_t {
    pthread_mutex_t     m;
    pthread_cond_t      c;
    unsigned char       state;
    int                 *payload;
    size_t              numElements;
    pthread_t           *threads;
    size_t              numThreads;
} shared_t;

同步需要大部分自我解释、互斥、条件和状态。

首先 shared_t 必须使用提供的选项用互斥体、条件、状态和线程进行初始化:

static inline void shared_init(shared_t *shared, size_t numThreads, size_t numElements) {
    memset(shared, 0, sizeof(shared_t));

    pthread_mutex_init(&shared->m, NULL);
    pthread_cond_init(&shared->c, NULL);

    shared->state = STATE_WAIT;

    shared->numThreads = numThreads;
    shared->numElements = numElements;

    {
        int it = 0;

        shared->threads = (pthread_t*) calloc(shared->numThreads, sizeof(pthread_t));

        while (it < shared->numThreads) {
            if (pthread_create(&shared->threads[it], NULL, routine, shared) != 0) {
                break;
            }
            it++;
        }
    }
}

当工作线程被这个例程创建时,它们被强制进入等待状态。

循环中对shared_populate的第一次调用在将负载设置为一些随机数后唤醒第一个线程:

static inline void shared_populate(shared_t *shared) {
    if (pthread_mutex_lock(&shared->m) != 0) {
        return;
    }

    shared->payload = (int*) calloc(shared->numElements, sizeof(int));  

    {
        int it = 0,
             end = shared->numElements;

        while (it < end) {
            shared->payload[it] = rand();

            it++;
        }
    }

    shared->state = STATE_READY;

    pthread_cond_signal(&shared->c);

    pthread_mutex_unlock(&shared->m);
}

注意 pthread_cond_signal 而不是 pthread_cond_broadcast,因为我们只想唤醒第一个线程。

void* routine(void *arg) {
    shared_t *shared = (shared_t*) arg;
    int *payload;

    do {
        if (pthread_mutex_lock(&shared->m) != 0) {
            break;
        }

        while (shared->state == STATE_WAIT) {
            pthread_cond_wait(&shared->c, &shared->m);
        }

        payload = shared->payload;

        shared->state = STATE_WAIT;

        pthread_mutex_unlock(&shared->m);

        if (payload) {
            int it = 0,
                 end = shared->numElements;

            while (it < end) {
                printf("Thread #%ld got payload %p(%d)=%d\n", 
                    pthread_self(), payload, it, payload[it]);
                it++;
            }

            free(payload);
        }
    } while(1);

    pthread_exit(NULL);
}

所以我们在routine调用pthread_cond_wait时醒来,状态已经改变,所以我们跳出循环,我们保存指向payload的指针,将状态重置为等待,然后释放互斥量。

此时main可以重新填充payload并唤醒下一个线程,同时当前工作线程可以处理,然后释放payload。

一些建议:

  • 始终使用尽可能少的互斥锁和条件变量 (KISS)
  • 研究条件变量的原子性
  • 始终遵循有关获取和释放互斥量以及条件变量信号的基本规则:
    • 如果您锁定了它,请将其解锁。
    • 永远只等待 某些东西:绝对需要预测等待循环,一直都是。

如果你不能重现我所做的,那就拿出代码并尝试对其进行扩展;您需要做的第一件事是能够正常关闭进程(输入 shared_cleanup),也许您需要可变大小的有效负载,或者原始问题中未提及的其他一些要求。

关于 printf 的注意事项 ... 不能保证附加到流中是原子的,碰巧大多数时候在 *nix 上都是 ... 因为我们是只是做展示和讲述,我们不需要关心这个......通常,不要依赖任何流操作的原子性......