我应该在这里使用哪个同步原语?
Which synchronization primitive should I employ here?
while(1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
puts((char *) message);
return NULL;
}
上面的例子不是MRE但是非常简单:
我有一个主线程,它有一个不断消耗消息队列中消息的循环。收到新消息后,它会存储在本地 message_buffer
缓冲区中。然后,新线程被派生到所述新消息的"take care",因此消息缓冲区的地址被传递到handle_message
,新线程随后执行。
问题
通常,2 个线程会打印相同的消息,即使我可以 100% 确定队列中的消息不相同。
我不完全确定,但我想我明白为什么会这样:
假设我将 2 条不同的消息推送到 mqueue,然后我才开始使用它们。
在 while
循环的第一次迭代中,消息将从队列中获取并保存到 message_buffer
。将生成一个新线程并将 message_length
的地址传递给它。但是该线程可能不够快,无法在 下一条消息被消耗之前(在循环的下一次迭代中)将缓冲区的内容打印到流 ,并且 message_buffer
随后被覆盖。因此,第一个和第二个线程现在打印相同的值。
我的问题是:解决这个问题最有效的方法是什么?我是并行编程的新手,threading/pthreads 不同的同步原语让我不知所措。
互斥问题
static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
while(1) {
char message_buffer[SIZE];
pthread_mutex_lock(&m);
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
pthred_mutex_unlock(&m);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
char own_buffer[SIZE];
pthread_mutex_lock(&m);
strncpy(own_buffer, (char *) message, SIZE);
pthread_mutex_unlock(&m);
puts(own_buffer);
return NULL;
}
我认为我当前的互斥实现不正确,因为线程仍在接收重复的消息。主线程可以锁定互斥锁,将消息消耗到缓冲区中,解锁互斥锁,生成一个线程,但该线程仍然可能挂起,而主线程可能只是再次重写缓冲区(因为缓冲区互斥锁从未被新锁定线程),有效地使我当前的互斥锁实现无用?我该如何克服这个问题?
问题是您在保证线程已完成该内存之前就结束了包含 message_buffer
的循环。
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
/****** Can't go beyond here until thread is done with message_buffer. ******/
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
/******* Only now can the caller loop back. ******/
puts(own_buffer);
return NULL;
}
您可以使用信号量或类似工具。
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int copied = 0;
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
// Wait until threads is done with message_buffer.
pthread_mutex_lock(&mutex);
while (!copied) pthread_cond_wait(&cond, &mutex);
copied = 0;
pthread_mutex_unlock(&mutex);
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
// Done with caller's buffer.
// Signal caller to continue.
pthread_mutex_lock(&mutex);
copied = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
puts(own_buffer);
return NULL;
}
(添加的块有效地执行了信号量操作。请参阅 的最后一个片段以获得更通用的实现。)
但是有一个更简单的解决方案:在创建线程之前制作副本。
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
if (ret) { /* error handling */ }
}
void * handle_message (void * message) {
char * own_buffer = message;
puts(own_buffer);
free(own_buffer);
return NULL;
}
while(1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
puts((char *) message);
return NULL;
}
上面的例子不是MRE但是非常简单:
我有一个主线程,它有一个不断消耗消息队列中消息的循环。收到新消息后,它会存储在本地 message_buffer
缓冲区中。然后,新线程被派生到所述新消息的"take care",因此消息缓冲区的地址被传递到handle_message
,新线程随后执行。
问题
通常,2 个线程会打印相同的消息,即使我可以 100% 确定队列中的消息不相同。
我不完全确定,但我想我明白为什么会这样:
假设我将 2 条不同的消息推送到 mqueue,然后我才开始使用它们。
在 while
循环的第一次迭代中,消息将从队列中获取并保存到 message_buffer
。将生成一个新线程并将 message_length
的地址传递给它。但是该线程可能不够快,无法在 下一条消息被消耗之前(在循环的下一次迭代中)将缓冲区的内容打印到流 ,并且 message_buffer
随后被覆盖。因此,第一个和第二个线程现在打印相同的值。
我的问题是:解决这个问题最有效的方法是什么?我是并行编程的新手,threading/pthreads 不同的同步原语让我不知所措。
互斥问题
static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
while(1) {
char message_buffer[SIZE];
pthread_mutex_lock(&m);
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
pthred_mutex_unlock(&m);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
char own_buffer[SIZE];
pthread_mutex_lock(&m);
strncpy(own_buffer, (char *) message, SIZE);
pthread_mutex_unlock(&m);
puts(own_buffer);
return NULL;
}
我认为我当前的互斥实现不正确,因为线程仍在接收重复的消息。主线程可以锁定互斥锁,将消息消耗到缓冲区中,解锁互斥锁,生成一个线程,但该线程仍然可能挂起,而主线程可能只是再次重写缓冲区(因为缓冲区互斥锁从未被新锁定线程),有效地使我当前的互斥锁实现无用?我该如何克服这个问题?
问题是您在保证线程已完成该内存之前就结束了包含 message_buffer
的循环。
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
/****** Can't go beyond here until thread is done with message_buffer. ******/
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
/******* Only now can the caller loop back. ******/
puts(own_buffer);
return NULL;
}
您可以使用信号量或类似工具。
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int copied = 0;
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
// Wait until threads is done with message_buffer.
pthread_mutex_lock(&mutex);
while (!copied) pthread_cond_wait(&cond, &mutex);
copied = 0;
pthread_mutex_unlock(&mutex);
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
// Done with caller's buffer.
// Signal caller to continue.
pthread_mutex_lock(&mutex);
copied = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
puts(own_buffer);
return NULL;
}
(添加的块有效地执行了信号量操作。请参阅
但是有一个更简单的解决方案:在创建线程之前制作副本。
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
if (ret) { /* error handling */ }
}
void * handle_message (void * message) {
char * own_buffer = message;
puts(own_buffer);
free(own_buffer);
return NULL;
}