C 测试问题中的循环缓冲区实现
Circular buffer implementation in C testing issue
我实现了自己的循环缓冲区。我想用两个线程来测试它。一个线程不断地写入缓冲区,而另一个线程不断地读取它。当读取一定数量的数据时,打印出基准。
此循环缓冲区的目的是写入和读取永远不会访问同一内存,因此不会引发竞争。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
if (buf -> valid_items >= buf -> max_items) {
// buffer full
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
if (isEmpty(buf)) {
// buffer empty
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
if (push(buf, hash) == 0) {
write_count++;
//printf("put %lu items into the buffer. valid_items: %d\n", write_count, buf -> valid_items);
}
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
if (pop(buf, new_buf) == 0) {
read_count++;
//printf("pop %lu items from the buffer. valid_items: %d\n", read_count, buf -> valid_items);
}
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
如果我在函数 "write_hash" 中留下 "printf" 并且注释 "read_hash" ,程序永远不会终止,这很奇怪。如果我取消注释这两个 "printf"s,程序将打印出所有 push 和 pop 信息直到结束,最终程序成功退出。
请帮我看看这是因为我的循环缓冲区实现有问题还是其他地方。
Luke,就像 EOF 所说的那样,您的代码中将存在竞争条件(可能有多个)。以下是我看到您的代码时的一些想法:
当线程共享内存时,您需要对该内存进行保护,以便一次只有一个线程可以访问它。您可以使用互斥体执行此操作(请参阅 pthread_mutex_lock 和 pthread_mutex_unlock)。
此外,我注意到您的读写线程中的 if 语句会检查压入和弹出的结果,以查看字符是否确实被压入或弹出。虽然这可能有效,但您可能应该使用信号量来同步您的线程。这就是我的意思:当你的写线程写入缓冲区时,你应该有类似 sem_post(&buff_count_sem);
的东西。然后,在您的读取线程读取一个字符之前,您应该有 sem_wait(&buff_count_sem);
以便您知道缓冲区中至少有一个字符。
同样的原理也适用于读线程。我建议读取线程在从缓冲区中弹出一个字节之后具有 sem_post(&space_left_sem)
并且写入线程在推送到缓冲区之前具有 sem_wait(&space_left_sem)
以确保缓冲区中有您正在尝试的字节的空间写。
以下是我对您的代码的建议更改(未测试):
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
pthread_mutex_t buff_mutex;
sem_t buff_count_sem;
sem_t space_left_sem;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
pthread_mutex_lock(&buff_mutex);
if (buf -> valid_items >= buf -> max_items) {
// buffer full
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
phthread_mutex_lock(&buff_mutex);
if (isEmpty(buf)) {
// buffer empty
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
sem_wait(&space_left_sem);
push(buf, hash);
write_count++;
sem_post(&buff_count_sem);
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
sem_wait(&buff_count_sem);
pop(buf, new_buf);
read_count++;
sem_post(&space_left_sem);
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
pthread_mutex_init(&buff_mutex);
sem_init(&buff_count_sem, 0, 0); // the last parameter is the initial value - initially 0 if no data is in the buffer
sem_init(&space_left_sem, 0, buf.max_items);
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
我建议阅读有关如何正确使用线程的内容。
更新:
1.修正了一个错别字。
2. 您还需要在 isEmpty 中使用 mutex_lock 和 mutex_unlock 以及您为循环缓冲区编写的任何其他函数,其中多个线程可能访问同一个缓冲区。
我实现了自己的循环缓冲区。我想用两个线程来测试它。一个线程不断地写入缓冲区,而另一个线程不断地读取它。当读取一定数量的数据时,打印出基准。
此循环缓冲区的目的是写入和读取永远不会访问同一内存,因此不会引发竞争。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
if (buf -> valid_items >= buf -> max_items) {
// buffer full
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
if (isEmpty(buf)) {
// buffer empty
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
if (push(buf, hash) == 0) {
write_count++;
//printf("put %lu items into the buffer. valid_items: %d\n", write_count, buf -> valid_items);
}
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
if (pop(buf, new_buf) == 0) {
read_count++;
//printf("pop %lu items from the buffer. valid_items: %d\n", read_count, buf -> valid_items);
}
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
如果我在函数 "write_hash" 中留下 "printf" 并且注释 "read_hash" ,程序永远不会终止,这很奇怪。如果我取消注释这两个 "printf"s,程序将打印出所有 push 和 pop 信息直到结束,最终程序成功退出。
请帮我看看这是因为我的循环缓冲区实现有问题还是其他地方。
Luke,就像 EOF 所说的那样,您的代码中将存在竞争条件(可能有多个)。以下是我看到您的代码时的一些想法:
当线程共享内存时,您需要对该内存进行保护,以便一次只有一个线程可以访问它。您可以使用互斥体执行此操作(请参阅 pthread_mutex_lock 和 pthread_mutex_unlock)。
此外,我注意到您的读写线程中的 if 语句会检查压入和弹出的结果,以查看字符是否确实被压入或弹出。虽然这可能有效,但您可能应该使用信号量来同步您的线程。这就是我的意思:当你的写线程写入缓冲区时,你应该有类似 sem_post(&buff_count_sem);
的东西。然后,在您的读取线程读取一个字符之前,您应该有 sem_wait(&buff_count_sem);
以便您知道缓冲区中至少有一个字符。
同样的原理也适用于读线程。我建议读取线程在从缓冲区中弹出一个字节之后具有 sem_post(&space_left_sem)
并且写入线程在推送到缓冲区之前具有 sem_wait(&space_left_sem)
以确保缓冲区中有您正在尝试的字节的空间写。
以下是我对您的代码的建议更改(未测试):
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
pthread_mutex_t buff_mutex;
sem_t buff_count_sem;
sem_t space_left_sem;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
pthread_mutex_lock(&buff_mutex);
if (buf -> valid_items >= buf -> max_items) {
// buffer full
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
phthread_mutex_lock(&buff_mutex);
if (isEmpty(buf)) {
// buffer empty
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
sem_wait(&space_left_sem);
push(buf, hash);
write_count++;
sem_post(&buff_count_sem);
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
sem_wait(&buff_count_sem);
pop(buf, new_buf);
read_count++;
sem_post(&space_left_sem);
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
pthread_mutex_init(&buff_mutex);
sem_init(&buff_count_sem, 0, 0); // the last parameter is the initial value - initially 0 if no data is in the buffer
sem_init(&space_left_sem, 0, buf.max_items);
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
我建议阅读有关如何正确使用线程的内容。
更新: 1.修正了一个错别字。 2. 您还需要在 isEmpty 中使用 mutex_lock 和 mutex_unlock 以及您为循环缓冲区编写的任何其他函数,其中多个线程可能访问同一个缓冲区。