使用 pthreads 的简单流水线

Simple pipelining using pthreads

最近我开始研究 pthreads 并尝试使用 pthreads 实现软件流水线。为此,我自己编写了一个玩具程序,其中一个类似程序将成为我主要项目的一部分。

所以在这个程序中,主线程创建并输入输出整数缓冲区类型,然后创建一个 主线程 并将这些缓冲区传递给 主线程 主线程 依次创建两个工作线程

输入和从main传递到输出缓冲区 =37=]master thread 的大小为 nxk(例如 5x10 的大小 int)。 主线程 迭代大小为 k(即 10)的块 n(即 5)次数。 在主线程中有一个循环运行ning k(这里是5)次。在 k 的每次迭代中, 主线程 对大小为 n 的输入数据的一部分进行一些操作并将它放在 公共缓冲区 masterworker threads 共享。然后 主线程 通知工作线程数据已放入 公共缓冲区

两个工作线程等待来自主线程的信号,如果公共缓冲区 准备好了。 公共缓冲区上的操作在工作线程中分成一半。这意味着一个 工作线程 将在前半部分工作,而另一个 工作线程 将在 公共部分的下半部分工作缓冲区。 一旦 工作线程 主线程 获得信号,每个 工作线程 都会执行一些操作在他们的一半数据上,并将其复制到 输出缓冲区 。然后 工作线程 通过设置标志通知 主线程 它们在 公共缓冲区 上的操作已完成值。为 工作线程 创建了一组标志。 master 线程 继续检查是否设置了所有标志,这基本上意味着所有 worker 线程 完成了它们在 common 上的操作buffermaster thread 可以将下一个数据块安全地放入 common buffer worker thread's消费。

所以基本上 masterworker threads 之间以管道方式进行通信。最后,我在 主线程 中打印 输出缓冲区 。但是我根本没有输出。我已经复制粘贴了我的代码,几乎所有步骤都有完整的注释。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/time.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdbool.h>
#include <string.h>

#define MthNum 1 //Number of Master threads
#define WthNum 2 //Number of Worker threads
#define times 5 // Number of times the iteration (n in the explanation)
#define elNum 10 //Chunk size during each iteration (k in the explanation)

pthread_mutex_t mutex; // mutex variable declaration
pthread_cond_t cond_var; //conditional variarble declaration
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends
                             //marking the completion
int *commonBuff; //common buffer between master and worker threads
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1
                // the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1)
int *commFlags_s;
int counter; // This counter used my master thread to count if all the commFlags[i] that shows
             //all the threads finished their work on the common buffer
// static pthread_barrier_t barrier;
// Arguments structure passed to master thread
typedef struct{
    int *input; // input buffer
    int *output;// output buffer
}master_args;

// Arguments structure passed to worker thread
typedef struct{
    int threadId;
    int *outBuff;
}worker_args;

void* worker_func(void *arguments);
void *master_func(void *);

int main(int argc,char*argv[]){

    int *ipData,*opData;
    int i,j;

    // allocation of input buffer and initializing to 0
    ipData = (int *)malloc(times*elNum*sizeof(int));
    memset(ipData,0,times*elNum*sizeof(int));

    // allocation of output buffer and initializing to 0
    opData = (int *)malloc(times*elNum*sizeof(int));
    memset(opData,0,times*elNum*sizeof(int));

    pthread_t thread[MthNum];
    master_args* args[MthNum];


    //creating the single master thread and passing the arguments
    for( i=0;i<MthNum;i++){
        args[i] = (master_args *)malloc(sizeof(master_args));
        args[i]->input= ipData;
        args[i]->output= opData;
        pthread_create(&thread[i],NULL,master_func,(void *)args[i]);
    }

    //joining the master thred
    for(i=0;i<MthNum;i++){
        pthread_join(thread[i],NULL);
    }

    //printing the output buffer values
    for(j =0;j<times;j++ ){
        for(i =0;i<elNum;i++){
            printf("%d\t",opData[i+j*times]);
        }
      printf("\n");
    }

    return 0;
}

//This is the master thread function
void *master_func(void *arguments){

    //copying the arguments pointer to local variables
    master_args* localMasterArgs = (master_args *)arguments;
    int *indataArgs = localMasterArgs->input; //input buffer
    int *outdataArgs = localMasterArgs->output; //output buffer

    //worker thread declaration
    pthread_t Workers[WthNum];
    //worker thread arguments declaration
    worker_args* wArguments[WthNum];
    int i,j;

    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init (&cond_var, NULL);
    counter =0;

    commonBuff = (int *)malloc(elNum*sizeof(int));

    commFlags = (int *)malloc(WthNum*sizeof(int));
    memset(commFlags,0,WthNum*sizeof(int) );
    commFlags_s= (int *)malloc(WthNum*sizeof(int));
    memset(commFlags_s,0,WthNum*sizeof(int) );

    for(i =0;i<WthNum;i++){

        wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
        wArguments[i]->threadId = i;
        wArguments[i]->outBuff = outdataArgs;

        pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]);
    }

    for (i = 0; i < times; i++) {
        for (j = 0; j < elNum; j++)
            indataArgs[i + j * elNum] = i + j;

        while (counter != 0) {
            counter = 0;

            pthread_mutex_lock(&mutex);
            for (j = 0; j < WthNum; j++) {
                counter += commFlags_s[j];
            }
            pthread_mutex_unlock(&mutex);

        }
        pthread_mutex_lock(&mutex);
        memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int));
        pthread_mutex_unlock(&mutex);
        counter = 1;
        while (counter != 0) {
            counter = 0;

            pthread_mutex_lock(&mutex);
            for (j = 0; j < WthNum; j++) {
                counter += commFlags[j];
            }
            pthread_mutex_unlock(&mutex);


        }
        // printf("master broad cast\n");
        pthread_mutex_lock(&mutex);
        pthread_cond_broadcast(&cond_var);
         //releasing the lock
        pthread_mutex_unlock(&mutex);

    }

    pthread_mutex_lock(&mutex);
     completion_flag = false;
    pthread_mutex_unlock(&mutex);

    for (i = 0; i < WthNum; i++) {
        pthread_join(Workers[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond_var);

    return NULL;
}


void* worker_func(void *arguments){

    worker_args* localArgs = (worker_args*)arguments;

    //copying the thread ID and the output buffer
    int tid = localArgs->threadId;
    int *localopBuffer = localArgs->outBuff;
    int i,j;
    bool local_completion_flag=false;

    while(local_completion_flag){

        pthread_mutex_lock(&mutex);
        commFlags[tid] =0;
        commFlags_s[tid] =1;
        pthread_cond_wait(&cond_var,&mutex);
        commFlags_s[tid] =0;
        commFlags[tid] =1;
        if (tid == 0) {
            for (i = 0; i < (elNum / 2); i++) {
                localopBuffer[i] = commonBuff[i] * 5;
            }
        } else { // Thread ID 1 operating on the other half of the common buffer data and placing on the
                 // output buffer
            for (i = 0; i < (elNum / 2); i++) {
                localopBuffer[elNum / 2 + i] = commonBuff[elNum / 2 + i] * 10;
            }
        }
         local_completion_flag=completion_flag;
        pthread_mutex_unlock(&mutex);//releasing the lock

    }

    return NULL;
}

但是我不知道我的实现哪里做错了,因为逻辑上它似乎是正确的。但是我的实现肯定有问题。我花了很长时间尝试不同的方法来修复它,但没有任何效果。很抱歉这么长 post 但我无法确定我可能做错的部分,所以我无法简明 post。因此,如果任何人都可以查看问题和实施,并可以建议需要对 运行 进行哪些更改,这将非常有帮助。感谢您的帮助和协助。

这段代码中有几个错误。

  1. 您可以从修复工作线程的创建开始:

    wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
    wArguments[i]->threadId = i;
    wArguments[i]->outBuff = outdataArgs;
    
    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments);
    

您正在初始化 worker_args 结构但不正确 - 将指针传递给数组 (void *)wArguments 而不是传递给您刚刚初始化的数组元素的指针。

pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]);
//                                                             ^^^
  1. 在启动使用它的值的线程之前初始化计数器:

    void *master_func(void *arguments)
    {
    /* (...) */
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init (&cond_var, NULL);
    counter = WthNum;
    
  2. 启动主线程时,错误地将指针传递给指针:

    pthread_create(&thread[i],NULL,master_func,(void *)&args[i]);
    

请将其更改为:

pthread_create(&thread[i],NULL,master_func,(void *) args[i]);
  1. 所有对 counter 变量的访问(与任何其他共享内存一样)必须在线程之间同步。

我认为你应该像这样使用基于信号量的生产者-消费者模型

https://jlmedina123.wordpress.com/2014/04/08/255/