在 C 中使用信号量和互斥锁实现生产者-消费者
Producer-Consumer implementation using semaphores and mutex locks in C
我正在使用以下代码在 C 中实现一个基本的产品消费者问题。不过在极少数情况下,我会得到如下所述的错误输出。
Received 值达到 0,但是当它达到 0 时仍然有生产者周期要离开,然后接收者不断得到 0 值并且输出出错。
请帮助我了解我的代码有什么问题。谢谢
C 代码:
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
sem_t semaphore;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int queue[50];
int queueLength;
void *producer( void * param)
{
for ( int i=0; i<50; i++ )
{
// Add item to queue
pthread_mutex_lock( &mutex );
queue[ queueLength++ ] = i;
printf("Sent %d\n", i);
pthread_mutex_unlock( &mutex );
// Signal semaphore
sem_post( &semaphore );
}
}
void *consumer(void * param)
{
for ( int i=0; i<50; i++ )
{
int item;
// Wait if nothing in queue
if (queueLength<1) { sem_wait(&semaphore); }
pthread_mutex_lock( &mutex );
item = queue[ -- queueLength ];
pthread_mutex_unlock( &mutex );
printf("Received %i\n", item);
}
}
int main()
{
pthread_t threads[2];
sem_init( &semaphore, 0, 1 );
pthread_create( &threads[0], 0, producer, 0 );
pthread_create( &threads[1], 0, consumer, 0 );
pthread_join( threads[0], 0 );
pthread_join( threads[1], 0 );
sem_destroy( &semaphore );
}
错误输出:
Sent 0
Sent 1
Sent 2
Sent 3
Sent 4
Sent 5
Sent 6
Sent 7
Sent 8
Sent 9
Sent 10
Sent 11
Sent 12
Sent 13
Sent 14
Sent 15
Sent 16
Sent 17
Sent 18
Sent 19
Sent 20
Sent 21
Sent 22
Sent 23
Sent 24
Sent 25
Sent 26
Sent 27
Sent 28
Received 28
Received 27
Received 26
Received 25
Received 24
Received 23
Received 22
Received 21
Received 20
Received 19
Received 18
Received 17
Received 16
Received 15
Received 14
Received 13
Received 12
Received 11
Received 10
Received 9
Received 8
Received 7
Received 6
Received 5
Received 4
Received 3
Received 2
Received 1
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received -8
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 1
Sent 29
Sent 30
Sent 31
Sent 32
Sent 33
Sent 34
Sent 35
Sent 36
Sent 37
Sent 38
Sent 39
Sent 40
Sent 41
Sent 42
Sent 43
Sent 44
Sent 45
Sent 46
Sent 47
Sent 48
Sent 49
当您的消费者线程启动时,信号量已经发出信号,并且 queueLength
大于零。您的消费者线程将立即开始出队(因为 queueLength
不为零),直到它达到零。
一旦队列耗尽,在下一次消费者循环迭代中(使用queueLength == 0
),它将进入if
子句并尝试等待信号量(已经发出信号),并立即获取锁。
所以信号量不能在 if
条件内,这意味着你可以使用类似的东西:
for ( int i=0; i<50; i++ )
{
int item;
// wait for signal
sem_wait(&semaphore);
pthread_mutex_lock( &mutex );
item = queue[--queueLength];
pthread_mutex_unlock( &mutex );
printf("Received %i\n", item);
}
我的建议(如果这是您的实际问题)是改为创建一个简单的循环缓冲区。如果实施得当(即只有一个生产者,一个消费者,如果生产者只移动 head
,而消费者只移动 tail
,并假设两个变量都是 int
或更小 - - 即可以原子方式读取和写入),您根本不需要 mutex
(仅用于信号化的信号量)。
你的例子中名为 queue
的数组实际上是一个 stack (LIFO),这在你通常想要出队的 producer/consumer 模式中是不常见的并首先处理最旧的项目。
我正在使用以下代码在 C 中实现一个基本的产品消费者问题。不过在极少数情况下,我会得到如下所述的错误输出。
Received 值达到 0,但是当它达到 0 时仍然有生产者周期要离开,然后接收者不断得到 0 值并且输出出错。
请帮助我了解我的代码有什么问题。谢谢
C 代码:
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
sem_t semaphore;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int queue[50];
int queueLength;
void *producer( void * param)
{
for ( int i=0; i<50; i++ )
{
// Add item to queue
pthread_mutex_lock( &mutex );
queue[ queueLength++ ] = i;
printf("Sent %d\n", i);
pthread_mutex_unlock( &mutex );
// Signal semaphore
sem_post( &semaphore );
}
}
void *consumer(void * param)
{
for ( int i=0; i<50; i++ )
{
int item;
// Wait if nothing in queue
if (queueLength<1) { sem_wait(&semaphore); }
pthread_mutex_lock( &mutex );
item = queue[ -- queueLength ];
pthread_mutex_unlock( &mutex );
printf("Received %i\n", item);
}
}
int main()
{
pthread_t threads[2];
sem_init( &semaphore, 0, 1 );
pthread_create( &threads[0], 0, producer, 0 );
pthread_create( &threads[1], 0, consumer, 0 );
pthread_join( threads[0], 0 );
pthread_join( threads[1], 0 );
sem_destroy( &semaphore );
}
错误输出:
Sent 0
Sent 1
Sent 2
Sent 3
Sent 4
Sent 5
Sent 6
Sent 7
Sent 8
Sent 9
Sent 10
Sent 11
Sent 12
Sent 13
Sent 14
Sent 15
Sent 16
Sent 17
Sent 18
Sent 19
Sent 20
Sent 21
Sent 22
Sent 23
Sent 24
Sent 25
Sent 26
Sent 27
Sent 28
Received 28
Received 27
Received 26
Received 25
Received 24
Received 23
Received 22
Received 21
Received 20
Received 19
Received 18
Received 17
Received 16
Received 15
Received 14
Received 13
Received 12
Received 11
Received 10
Received 9
Received 8
Received 7
Received 6
Received 5
Received 4
Received 3
Received 2
Received 1
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received -8
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 0
Received 1
Sent 29
Sent 30
Sent 31
Sent 32
Sent 33
Sent 34
Sent 35
Sent 36
Sent 37
Sent 38
Sent 39
Sent 40
Sent 41
Sent 42
Sent 43
Sent 44
Sent 45
Sent 46
Sent 47
Sent 48
Sent 49
当您的消费者线程启动时,信号量已经发出信号,并且 queueLength
大于零。您的消费者线程将立即开始出队(因为 queueLength
不为零),直到它达到零。
一旦队列耗尽,在下一次消费者循环迭代中(使用queueLength == 0
),它将进入if
子句并尝试等待信号量(已经发出信号),并立即获取锁。
所以信号量不能在 if
条件内,这意味着你可以使用类似的东西:
for ( int i=0; i<50; i++ )
{
int item;
// wait for signal
sem_wait(&semaphore);
pthread_mutex_lock( &mutex );
item = queue[--queueLength];
pthread_mutex_unlock( &mutex );
printf("Received %i\n", item);
}
我的建议(如果这是您的实际问题)是改为创建一个简单的循环缓冲区。如果实施得当(即只有一个生产者,一个消费者,如果生产者只移动 head
,而消费者只移动 tail
,并假设两个变量都是 int
或更小 - - 即可以原子方式读取和写入),您根本不需要 mutex
(仅用于信号化的信号量)。
你的例子中名为 queue
的数组实际上是一个 stack (LIFO),这在你通常想要出队的 producer/consumer 模式中是不常见的并首先处理最旧的项目。