多线程 - 只有在所有线程都完成任务后才继续
Multithreading - Continuing only after ALL threads have finished a task
我正在尝试使用多线程实现多线程(用户可以在启动程序时输入工作线程数 = 线程数)
每个线程调用 functionA 和之后的 functionB。但是在 functionB 之前应该只在所有线程之后执行
调用了函数A。那是我的伪代码:
void* worker_do(void* worker_id)
{
functionA((size_t) worker_id);
// First thread should only start functionB after ALL threads
// are finished with functionA
functionB((size_t) worker_id);
return NULL;
}
// I am not allowed to change pthread_create and pthread_join here
int main()
{
// should be a flexible value
ssize_t num_workers = 20;
pthread_t* workers = malloc(num_workers*sizeof(pthread_t));
for(ssize_t i = 0; i < num_workers; i++)
pthread_create(&workers[i], NULL, worker_do, (void*) i);
for(ssize_t i = 0; i < num_workers; i++)
pthread_join(workers[i], NULL);
free(workers);
return 0;
}
我用谷歌搜索并找到了 "condition variables" 的可能性。但我不确定他们是否必须针对条件实施
IF last_thread_has_called_functionA THEN start_calling_fuctionB
或者条件变量不是解决这个问题的正确工具?
非常感谢提示我如何实现它...
bw 罗伯特
我假设 functionA()
和 functionB()
可以由线程并行执行,因为您当前的代码中没有互斥锁保护。
为了解决您的问题,您可以使用简单的轮询机制。 functionA() 执行后,每个线程都会递增一个计数器。所有线程都将等待,直到计数器等于创建的线程数。
对于这种方法,您需要有一个在所有线程中通用的互斥体和计数器。为了简化代码,我使用了一个全局变量。
unsigned int num_threads = 0;
unsigned int num_threads_completed_functionA = 0;
pthread_mutex_t lock;
void* worker_do(void* worker_id)
{
functionA((size_t) worker_id);
// First thread should only start functionB after ALL threads are finished with functionA
//Lock the mutex and update the counter
pthread_mutex_lock(&lock);
num_threads_completed_functionA++;
pthread_mutex_unlock(&lock);
while(1)
{
//Lock mutex and check how many threads completed execution of functionA()
pthread_mutex_lock(&lock);
if(num_threads_completed_functionA == num_threads)
{
//If all threads completed, then break the loop and proceed executing functionB()
pthread_mutex_unlock(&lock);
break;
}
pthread_mutex_unlock(&lock);
usleep(1); //Sleep for some time
}
//ALL threads are finished with functionA
functionB((size_t) worker_id);
return NULL;
}
既然你问了,要用条件变量和互斥来做到这一点,你可以这样做:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <inttypes.h>
#define N_THREADS 10
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
unsigned int count = 0;
void functionA(intptr_t id)
{
printf("functionA: %" PRIdPTR "\n", id);
}
void functionB(intptr_t id)
{
printf("functionB: %" PRIdPTR "\n", id);
}
void* thread_proc(void* pv)
{
intptr_t id = (intptr_t)pv;
functionA(id);
// lock the mutex to protect the predicate data (count)
pthread_mutex_lock(&mtx);
++count;
pthread_cond_broadcast(&cv);
// wait for all threads to finish A
while (count < N_THREADS)
pthread_cond_wait(&cv, &mtx);
// this is still owned by us. release it.
pthread_mutex_unlock(&mtx);
// now B
functionB(id);
return NULL;
}
int main()
{
pthread_t thrds[N_THREADS];
for (int i=0; i<N_THREADS; ++i)
pthread_create(thrds+i, NULL, thread_proc, (void*)(intptr_t)(i+1));
for (int i=0; i<N_THREADS; ++i)
pthread_join(thrds[i], NULL);
return EXIT_SUCCESS;
}
样本输出(变化)
functionA: 1
functionA: 4
functionA: 6
functionA: 3
functionA: 2
functionA: 8
functionA: 9
functionA: 7
functionA: 10
functionA: 5
functionB: 10
functionB: 9
functionB: 5
functionB: 7
functionB: 4
functionB: 6
functionB: 1
functionB: 2
functionB: 8
functionB: 3
也就是说,正如 Jonathan 在一般性评论中指出的那样,barrier 是解决此问题的更优雅的方法。我会 post 举个例子,但可惜我的环境不支持它们(遗憾的是,mac os x)。它们在 most Unix pthread 实现上可用,因此如果您的目标平台提供它们,我建议适当地调查它们。
我正在尝试使用多线程实现多线程(用户可以在启动程序时输入工作线程数 = 线程数) 每个线程调用 functionA 和之后的 functionB。但是在 functionB 之前应该只在所有线程之后执行 调用了函数A。那是我的伪代码:
void* worker_do(void* worker_id)
{
functionA((size_t) worker_id);
// First thread should only start functionB after ALL threads
// are finished with functionA
functionB((size_t) worker_id);
return NULL;
}
// I am not allowed to change pthread_create and pthread_join here
int main()
{
// should be a flexible value
ssize_t num_workers = 20;
pthread_t* workers = malloc(num_workers*sizeof(pthread_t));
for(ssize_t i = 0; i < num_workers; i++)
pthread_create(&workers[i], NULL, worker_do, (void*) i);
for(ssize_t i = 0; i < num_workers; i++)
pthread_join(workers[i], NULL);
free(workers);
return 0;
}
我用谷歌搜索并找到了 "condition variables" 的可能性。但我不确定他们是否必须针对条件实施
IF last_thread_has_called_functionA THEN start_calling_fuctionB
或者条件变量不是解决这个问题的正确工具?
非常感谢提示我如何实现它...
bw 罗伯特
我假设 functionA()
和 functionB()
可以由线程并行执行,因为您当前的代码中没有互斥锁保护。
为了解决您的问题,您可以使用简单的轮询机制。 functionA() 执行后,每个线程都会递增一个计数器。所有线程都将等待,直到计数器等于创建的线程数。
对于这种方法,您需要有一个在所有线程中通用的互斥体和计数器。为了简化代码,我使用了一个全局变量。
unsigned int num_threads = 0;
unsigned int num_threads_completed_functionA = 0;
pthread_mutex_t lock;
void* worker_do(void* worker_id)
{
functionA((size_t) worker_id);
// First thread should only start functionB after ALL threads are finished with functionA
//Lock the mutex and update the counter
pthread_mutex_lock(&lock);
num_threads_completed_functionA++;
pthread_mutex_unlock(&lock);
while(1)
{
//Lock mutex and check how many threads completed execution of functionA()
pthread_mutex_lock(&lock);
if(num_threads_completed_functionA == num_threads)
{
//If all threads completed, then break the loop and proceed executing functionB()
pthread_mutex_unlock(&lock);
break;
}
pthread_mutex_unlock(&lock);
usleep(1); //Sleep for some time
}
//ALL threads are finished with functionA
functionB((size_t) worker_id);
return NULL;
}
既然你问了,要用条件变量和互斥来做到这一点,你可以这样做:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <inttypes.h>
#define N_THREADS 10
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
unsigned int count = 0;
void functionA(intptr_t id)
{
printf("functionA: %" PRIdPTR "\n", id);
}
void functionB(intptr_t id)
{
printf("functionB: %" PRIdPTR "\n", id);
}
void* thread_proc(void* pv)
{
intptr_t id = (intptr_t)pv;
functionA(id);
// lock the mutex to protect the predicate data (count)
pthread_mutex_lock(&mtx);
++count;
pthread_cond_broadcast(&cv);
// wait for all threads to finish A
while (count < N_THREADS)
pthread_cond_wait(&cv, &mtx);
// this is still owned by us. release it.
pthread_mutex_unlock(&mtx);
// now B
functionB(id);
return NULL;
}
int main()
{
pthread_t thrds[N_THREADS];
for (int i=0; i<N_THREADS; ++i)
pthread_create(thrds+i, NULL, thread_proc, (void*)(intptr_t)(i+1));
for (int i=0; i<N_THREADS; ++i)
pthread_join(thrds[i], NULL);
return EXIT_SUCCESS;
}
样本输出(变化)
functionA: 1
functionA: 4
functionA: 6
functionA: 3
functionA: 2
functionA: 8
functionA: 9
functionA: 7
functionA: 10
functionA: 5
functionB: 10
functionB: 9
functionB: 5
functionB: 7
functionB: 4
functionB: 6
functionB: 1
functionB: 2
functionB: 8
functionB: 3
也就是说,正如 Jonathan 在一般性评论中指出的那样,barrier 是解决此问题的更优雅的方法。我会 post 举个例子,但可惜我的环境不支持它们(遗憾的是,mac os x)。它们在 most Unix pthread 实现上可用,因此如果您的目标平台提供它们,我建议适当地调查它们。