C 测试问题中的循环缓冲区实现

Circular buffer implementation in C testing issue

我实现了自己的循环缓冲区。我想用两个线程来测试它。一个线程不断地写入缓冲区,而另一个线程不断地读取它。当读取一定数量的数据时,打印出基准。

此循环缓冲区的目的是写入和读取永远不会访问同一内存,因此不会引发竞争。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>

#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32

long write_count = 0;
long read_count = 0;

struct cBuf {
    int first;
    int last;
    int max_items;
    int item_size;
    int valid_items;
    unsigned char *buffer;
};


void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
    buf -> first = 0;
    buf -> last = 0;
    buf -> max_items = max_items;
    buf -> item_size = item_size;
    buf -> valid_items = 0;
    buf -> buffer = calloc(max_items, item_size);
    return;
}


int isEmpty(struct cBuf *buf) {
    if (buf -> valid_items == 0) {
        return 1;
    }
    else {
        return 0;
    }
}

int push(struct cBuf *buf, unsigned char *data) {
    if (buf -> valid_items >= buf -> max_items) {
        // buffer full
        return -1;
    }
    else {
        // push data into the buffer
        memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);

        // update cBuf info
        buf -> valid_items++;
        buf -> last = (buf -> last + 1) % (buf -> max_items);  
        return 0;
    }
}

int pop(struct cBuf *buf, unsigned char *new_buf) {
    if (isEmpty(buf)) {
        // buffer empty
        return -1;
    }
    else {
        // read data
        memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);

        // update cBuf info
        buf -> first = (buf -> first + 1) % (buf -> max_items);
        buf -> valid_items--;
        return 0;
    }
}

void *write_hash(void *ptr) {
    struct cBuf *buf = (struct cBuf *)(ptr);

    while (1) {
        unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
        if (push(buf, hash) == 0) {
            write_count++;
            //printf("put %lu items into the buffer. valid_items: %d\n", write_count, buf -> valid_items);
        }
        free(hash);
        if (write_count == MAX_NUM_ITEM) {
            break;
        }
    }

    printf ("  thread id  = %lu\n", (long unsigned) (pthread_self ()));
    printf ("  total write = %lu\n\n", write_count);
    return NULL;
}

void *read_hash(void *ptr) {

    struct cBuf *buf = (struct cBuf *)(ptr);
    unsigned char *new_buf = malloc(HASH_SIZE);
    while (1) {
        if (pop(buf, new_buf) == 0) {
            read_count++;
            //printf("pop %lu items from the buffer. valid_items: %d\n", read_count, buf -> valid_items);
        }
        if (read_count == MAX_NUM_ITEM) {
            break;
        }
    }
    free(new_buf);

    printf ("  thread id  = %lu\n", (long unsigned) (pthread_self ()));
    printf ("  total read = %lu\n\n", read_count);

}

int main(int argc, char const *argv[]) {

    struct cBuf buf;
    init_cBuf(&buf, 200, HASH_SIZE);

    pthread_t write_thd, read_thd;

    double diff = 0.0, t1 = 0.0, t2 = 0.0;
    t1 = clock ();

    pthread_create(&read_thd, NULL, read_hash, &buf);
    pthread_create(&write_thd, NULL, write_hash, &buf);

    pthread_join(write_thd, NULL);
    pthread_join(read_thd, NULL);


    t2 = clock ();
    diff = (double)((t2 - t1) / CLOCKS_PER_SEC);

    printf ("----------------\nTotal time: %lf second\n", diff);
    printf ("Total write: %lu\n", write_count);
    printf ("write per-second: %lf\n\n", write_count / diff);

    return 0;
}

如果我在函数 "write_hash" 中留下 "printf" 并且注释 "read_hash" ,程序永远不会终止,这很奇怪。如果我取消注释这两个 "printf"s,程序将打印出所有 push 和 pop 信息直到结束,最终程序成功退出。

请帮我看看这是因为我的循环缓冲区实现有问题还是其他地方。

Luke,就像 EOF 所说的那样,您的代码中将存在竞争条件(可能有多个)。以下是我看到您的代码时的一些想法:

当线程共享内存时,您需要对该内存进行保护,以便一次只有一个线程可以访问它。您可以使用互斥体执行此操作(请参阅 pthread_mutex_lock 和 pthread_mutex_unlock)。

此外,我注意到您的读写线程中的 if 语句会检查压入和弹出的结果,以查看字符是否确实被压入或弹出。虽然这可能有效,但您可能应该使用信号量来同步您的线程。这就是我的意思:当你的写线程写入缓冲区时,你应该有类似 sem_post(&buff_count_sem); 的东西。然后,在您的读取线程读取一个字符之前,您应该有 sem_wait(&buff_count_sem); 以便您知道缓冲区中至少有一个字符。

同样的原理也适用于读线程。我建议读取线程在从缓冲区中弹出一个字节之后具有 sem_post(&space_left_sem) 并且写入线程在推送到缓冲区之前具有 sem_wait(&space_left_sem) 以确保缓冲区中有您正在尝试的字节的空间写。

以下是我对您的代码的建议更改(未测试):

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>

#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32

long write_count = 0;
long read_count = 0;
pthread_mutex_t buff_mutex;
sem_t buff_count_sem;
sem_t space_left_sem;

struct cBuf {
    int first;
    int last;
    int max_items;
    int item_size;
    int valid_items;
    unsigned char *buffer;
};


void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
    buf -> first = 0;
    buf -> last = 0;
    buf -> max_items = max_items;
    buf -> item_size = item_size;
    buf -> valid_items = 0;
    buf -> buffer = calloc(max_items, item_size);
    return;
}


int isEmpty(struct cBuf *buf) {
    if (buf -> valid_items == 0) {
        return 1;
    }
    else {
        return 0;
    }
}

int push(struct cBuf *buf, unsigned char *data) {
    pthread_mutex_lock(&buff_mutex);
    if (buf -> valid_items >= buf -> max_items) {
        // buffer full
        pthread_mutex_unlock(&buff_mutex);
        return -1;
    }
    else {
        // push data into the buffer
        memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);

        // update cBuf info
        buf -> valid_items++;
        buf -> last = (buf -> last + 1) % (buf -> max_items);  
        pthread_mutex_unlock(&buff_mutex);
        return 0;
    }
}

int pop(struct cBuf *buf, unsigned char *new_buf) {
    phthread_mutex_lock(&buff_mutex);
    if (isEmpty(buf)) {
        // buffer empty
        pthread_mutex_unlock(&buff_mutex);
        return -1;
    }
    else {
        // read data
        memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);

        // update cBuf info
        buf -> first = (buf -> first + 1) % (buf -> max_items);
        buf -> valid_items--;
        pthread_mutex_unlock(&buff_mutex);
        return 0;
    }
}

void *write_hash(void *ptr) {
    struct cBuf *buf = (struct cBuf *)(ptr);

    while (1) {
        unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.

        sem_wait(&space_left_sem);
        push(buf, hash);
        write_count++;
        sem_post(&buff_count_sem);
        free(hash);
        if (write_count == MAX_NUM_ITEM) {
            break;
        }
    }

    printf ("  thread id  = %lu\n", (long unsigned) (pthread_self ()));
    printf ("  total write = %lu\n\n", write_count);
    return NULL;
}

void *read_hash(void *ptr) {

    struct cBuf *buf = (struct cBuf *)(ptr);
    unsigned char *new_buf = malloc(HASH_SIZE);
    while (1) {

        sem_wait(&buff_count_sem);
        pop(buf, new_buf);
        read_count++;
        sem_post(&space_left_sem);

        if (read_count == MAX_NUM_ITEM) {
            break;
        }
    }
    free(new_buf);

    printf ("  thread id  = %lu\n", (long unsigned) (pthread_self ()));
    printf ("  total read = %lu\n\n", read_count);

}

int main(int argc, char const *argv[]) {

    struct cBuf buf;
    init_cBuf(&buf, 200, HASH_SIZE);

    pthread_t write_thd, read_thd;

    pthread_mutex_init(&buff_mutex);
    sem_init(&buff_count_sem, 0, 0); // the last parameter is the initial value - initially 0 if no data is in the buffer
    sem_init(&space_left_sem, 0, buf.max_items);

    double diff = 0.0, t1 = 0.0, t2 = 0.0;
    t1 = clock ();

    pthread_create(&read_thd, NULL, read_hash, &buf);
    pthread_create(&write_thd, NULL, write_hash, &buf);

    pthread_join(write_thd, NULL);
    pthread_join(read_thd, NULL);


    t2 = clock ();
    diff = (double)((t2 - t1) / CLOCKS_PER_SEC);

    printf ("----------------\nTotal time: %lf second\n", diff);
    printf ("Total write: %lu\n", write_count);
    printf ("write per-second: %lf\n\n", write_count / diff);

    return 0;
}

我建议阅读有关如何正确使用线程的内容。

更新: 1.修正了一个错别字。 2. 您还需要在 isEmpty 中使用 mutex_lock 和 mutex_unlock 以及您为循环缓冲区编写的任何其他函数,其中多个线程可能访问同一个缓冲区。