多线程 - 只有在所有线程都完成任务后才继续

Multithreading - Continuing only after ALL threads have finished a task

我正在尝试使用多线程实现多线程(用户可以在启动程序时输入工作线程数 = 线程数) 每个线程调用 functionA 和之后的 functionB。但是在 functionB 之前应该只在所有线程之后执行 调用了函数A。那是我的伪代码:

void* worker_do(void* worker_id)
{
  functionA((size_t) worker_id);
  // First thread should only start functionB after ALL threads
  // are finished with functionA
  functionB((size_t) worker_id);
  return NULL;
}

// I am not allowed to change pthread_create and pthread_join here
int main()
{
  // should be a flexible value
  ssize_t num_workers = 20;
  pthread_t* workers  = malloc(num_workers*sizeof(pthread_t));

  for(ssize_t i = 0; i < num_workers; i++)
    pthread_create(&workers[i], NULL, worker_do, (void*) i);

  for(ssize_t i = 0; i < num_workers; i++)
    pthread_join(workers[i], NULL);

  free(workers);

  return 0;
}

我用谷歌搜索并找到了 "condition variables" 的可能性。但我不确定他们是否必须针对条件实施

IF last_thread_has_called_functionA THEN start_calling_fuctionB

或者条件变量不是解决这个问题的正确工具?

非常感谢提示我如何实现它...

bw 罗伯特

我假设 functionA()functionB() 可以由线程并行执行,因为您当前的代码中没有互斥锁保护。

为了解决您的问题,您可以使用简单的轮询机制。 functionA() 执行后,每个线程都会递增一个计数器。所有线程都将等待,直到计数器等于创建的线程数。

对于这种方法,您需要有一个在所有线程中通用的互斥体和计数器。为了简化代码,我使用了一个全局变量。

unsigned int num_threads = 0;
unsigned int num_threads_completed_functionA = 0;
pthread_mutex_t lock;

void* worker_do(void* worker_id)
{
  functionA((size_t) worker_id);
  // First thread should only start functionB after ALL threads are finished with functionA

  //Lock the mutex and update the counter
  pthread_mutex_lock(&lock);
  num_threads_completed_functionA++;
  pthread_mutex_unlock(&lock);

  while(1)
  {
    //Lock mutex and check how many threads completed execution of functionA()
    pthread_mutex_lock(&lock);
    if(num_threads_completed_functionA == num_threads)
    {
       //If all threads completed, then break the loop and proceed executing functionB()
       pthread_mutex_unlock(&lock);
       break;
    }
    pthread_mutex_unlock(&lock);
    usleep(1); //Sleep for some time
  }

  //ALL threads are finished with functionA
  functionB((size_t) worker_id);
  return NULL;
}

既然你问了,要用条件变量和互斥来做到这一点,你可以这样做:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <inttypes.h>

#define N_THREADS   10

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
unsigned int count = 0;

void functionA(intptr_t id)
{
    printf("functionA: %" PRIdPTR "\n", id);
}

void functionB(intptr_t id)
{
    printf("functionB: %" PRIdPTR "\n", id);
}

void* thread_proc(void* pv)
{
    intptr_t id = (intptr_t)pv;

    functionA(id);

    // lock the mutex to protect the predicate data (count)
    pthread_mutex_lock(&mtx);
    ++count;
    pthread_cond_broadcast(&cv);

    // wait for all threads to finish A
    while (count < N_THREADS)
        pthread_cond_wait(&cv, &mtx);

    // this is still owned by us. release it.
    pthread_mutex_unlock(&mtx);

    // now B
    functionB(id);

    return NULL;
}

int main()
{
    pthread_t thrds[N_THREADS];
    for (int i=0; i<N_THREADS; ++i)
        pthread_create(thrds+i, NULL, thread_proc, (void*)(intptr_t)(i+1));

    for (int i=0; i<N_THREADS; ++i)
        pthread_join(thrds[i], NULL);

    return EXIT_SUCCESS;
}

样本输出(变化)

functionA: 1
functionA: 4
functionA: 6
functionA: 3
functionA: 2
functionA: 8
functionA: 9
functionA: 7
functionA: 10
functionA: 5
functionB: 10
functionB: 9
functionB: 5
functionB: 7
functionB: 4
functionB: 6
functionB: 1
functionB: 2
functionB: 8
functionB: 3

也就是说,正如 Jonathan 在一般性评论中指出的那样,barrier 是解决此问题的更优雅的方法。我会 post 举个例子,但可惜我的环境不支持它们(遗憾的是,mac os x)。它们在 most Unix pthread 实现上可用,因此如果您的目标平台提供它们,我建议适当地调查它们。