使用 pthreads 的简单流水线
Simple pipelining using pthreads
最近我开始研究 pthreads 并尝试使用 pthreads 实现软件流水线。为此,我自己编写了一个玩具程序,其中一个类似程序将成为我主要项目的一部分。
所以在这个程序中,主线程创建并输入和输出整数缓冲区类型,然后创建一个 主线程 并将这些缓冲区传递给 主线程 。 主线程 依次创建两个工作线程。
输入和从main传递到输出缓冲区 =37=]master thread 的大小为 nxk(例如 5x10 的大小 int)。 主线程 迭代大小为 k(即 10)的块 n(即 5)次数。
在主线程中有一个循环运行ning k(这里是5)次。在 k 的每次迭代中, 主线程 对大小为 n 的输入数据的一部分进行一些操作并将它放在 公共缓冲区 中 master 和 worker threads 共享。然后 主线程 通知工作线程数据已放入 公共缓冲区 。
两个工作线程等待来自主线程的信号,如果公共缓冲区 准备好了。 公共缓冲区上的操作在工作线程中分成一半。这意味着一个 工作线程 将在前半部分工作,而另一个 工作线程 将在 公共部分的下半部分工作缓冲区。
一旦 工作线程 从 主线程 获得信号,每个 工作线程 都会执行一些操作在他们的一半数据上,并将其复制到 输出缓冲区 。然后 工作线程 通过设置标志通知 主线程 它们在 公共缓冲区 上的操作已完成值。为 工作线程 创建了一组标志。 master 线程 继续检查是否设置了所有标志,这基本上意味着所有 worker 线程 完成了它们在 common 上的操作buffer 等 master thread 可以将下一个数据块安全地放入 common buffer worker thread's消费。
所以基本上 master 和 worker threads 之间以管道方式进行通信。最后,我在 主线程 中打印 输出缓冲区 。但是我根本没有输出。我已经复制粘贴了我的代码,几乎所有步骤都有完整的注释。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/time.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdbool.h>
#include <string.h>
#define MthNum 1 //Number of Master threads
#define WthNum 2 //Number of Worker threads
#define times 5 // Number of times the iteration (n in the explanation)
#define elNum 10 //Chunk size during each iteration (k in the explanation)
pthread_mutex_t mutex; // mutex variable declaration
pthread_cond_t cond_var; //conditional variarble declaration
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends
//marking the completion
int *commonBuff; //common buffer between master and worker threads
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1
// the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1)
int *commFlags_s;
int counter; // This counter used my master thread to count if all the commFlags[i] that shows
//all the threads finished their work on the common buffer
// static pthread_barrier_t barrier;
// Arguments structure passed to master thread
typedef struct{
int *input; // input buffer
int *output;// output buffer
}master_args;
// Arguments structure passed to worker thread
typedef struct{
int threadId;
int *outBuff;
}worker_args;
void* worker_func(void *arguments);
void *master_func(void *);
int main(int argc,char*argv[]){
int *ipData,*opData;
int i,j;
// allocation of input buffer and initializing to 0
ipData = (int *)malloc(times*elNum*sizeof(int));
memset(ipData,0,times*elNum*sizeof(int));
// allocation of output buffer and initializing to 0
opData = (int *)malloc(times*elNum*sizeof(int));
memset(opData,0,times*elNum*sizeof(int));
pthread_t thread[MthNum];
master_args* args[MthNum];
//creating the single master thread and passing the arguments
for( i=0;i<MthNum;i++){
args[i] = (master_args *)malloc(sizeof(master_args));
args[i]->input= ipData;
args[i]->output= opData;
pthread_create(&thread[i],NULL,master_func,(void *)args[i]);
}
//joining the master thred
for(i=0;i<MthNum;i++){
pthread_join(thread[i],NULL);
}
//printing the output buffer values
for(j =0;j<times;j++ ){
for(i =0;i<elNum;i++){
printf("%d\t",opData[i+j*times]);
}
printf("\n");
}
return 0;
}
//This is the master thread function
void *master_func(void *arguments){
//copying the arguments pointer to local variables
master_args* localMasterArgs = (master_args *)arguments;
int *indataArgs = localMasterArgs->input; //input buffer
int *outdataArgs = localMasterArgs->output; //output buffer
//worker thread declaration
pthread_t Workers[WthNum];
//worker thread arguments declaration
worker_args* wArguments[WthNum];
int i,j;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init (&cond_var, NULL);
counter =0;
commonBuff = (int *)malloc(elNum*sizeof(int));
commFlags = (int *)malloc(WthNum*sizeof(int));
memset(commFlags,0,WthNum*sizeof(int) );
commFlags_s= (int *)malloc(WthNum*sizeof(int));
memset(commFlags_s,0,WthNum*sizeof(int) );
for(i =0;i<WthNum;i++){
wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
wArguments[i]->threadId = i;
wArguments[i]->outBuff = outdataArgs;
pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]);
}
for (i = 0; i < times; i++) {
for (j = 0; j < elNum; j++)
indataArgs[i + j * elNum] = i + j;
while (counter != 0) {
counter = 0;
pthread_mutex_lock(&mutex);
for (j = 0; j < WthNum; j++) {
counter += commFlags_s[j];
}
pthread_mutex_unlock(&mutex);
}
pthread_mutex_lock(&mutex);
memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int));
pthread_mutex_unlock(&mutex);
counter = 1;
while (counter != 0) {
counter = 0;
pthread_mutex_lock(&mutex);
for (j = 0; j < WthNum; j++) {
counter += commFlags[j];
}
pthread_mutex_unlock(&mutex);
}
// printf("master broad cast\n");
pthread_mutex_lock(&mutex);
pthread_cond_broadcast(&cond_var);
//releasing the lock
pthread_mutex_unlock(&mutex);
}
pthread_mutex_lock(&mutex);
completion_flag = false;
pthread_mutex_unlock(&mutex);
for (i = 0; i < WthNum; i++) {
pthread_join(Workers[i], NULL);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_var);
return NULL;
}
void* worker_func(void *arguments){
worker_args* localArgs = (worker_args*)arguments;
//copying the thread ID and the output buffer
int tid = localArgs->threadId;
int *localopBuffer = localArgs->outBuff;
int i,j;
bool local_completion_flag=false;
while(local_completion_flag){
pthread_mutex_lock(&mutex);
commFlags[tid] =0;
commFlags_s[tid] =1;
pthread_cond_wait(&cond_var,&mutex);
commFlags_s[tid] =0;
commFlags[tid] =1;
if (tid == 0) {
for (i = 0; i < (elNum / 2); i++) {
localopBuffer[i] = commonBuff[i] * 5;
}
} else { // Thread ID 1 operating on the other half of the common buffer data and placing on the
// output buffer
for (i = 0; i < (elNum / 2); i++) {
localopBuffer[elNum / 2 + i] = commonBuff[elNum / 2 + i] * 10;
}
}
local_completion_flag=completion_flag;
pthread_mutex_unlock(&mutex);//releasing the lock
}
return NULL;
}
但是我不知道我的实现哪里做错了,因为逻辑上它似乎是正确的。但是我的实现肯定有问题。我花了很长时间尝试不同的方法来修复它,但没有任何效果。很抱歉这么长 post 但我无法确定我可能做错的部分,所以我无法简明 post。因此,如果任何人都可以查看问题和实施,并可以建议需要对 运行 进行哪些更改,这将非常有帮助。感谢您的帮助和协助。
这段代码中有几个错误。
您可以从修复工作线程的创建开始:
wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
wArguments[i]->threadId = i;
wArguments[i]->outBuff = outdataArgs;
pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments);
您正在初始化 worker_args
结构但不正确 - 将指针传递给数组 (void *)wArguments
而不是传递给您刚刚初始化的数组元素的指针。
pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]);
// ^^^
在启动使用它的值的线程之前初始化计数器:
void *master_func(void *arguments)
{
/* (...) */
pthread_mutex_init(&mutex, NULL);
pthread_cond_init (&cond_var, NULL);
counter = WthNum;
启动主线程时,错误地将指针传递给指针:
pthread_create(&thread[i],NULL,master_func,(void *)&args[i]);
请将其更改为:
pthread_create(&thread[i],NULL,master_func,(void *) args[i]);
- 所有对
counter
变量的访问(与任何其他共享内存一样)必须在线程之间同步。
我认为你应该像这样使用基于信号量的生产者-消费者模型
最近我开始研究 pthreads 并尝试使用 pthreads 实现软件流水线。为此,我自己编写了一个玩具程序,其中一个类似程序将成为我主要项目的一部分。
所以在这个程序中,主线程创建并输入和输出整数缓冲区类型,然后创建一个 主线程 并将这些缓冲区传递给 主线程 。 主线程 依次创建两个工作线程。
输入和从main传递到输出缓冲区 =37=]master thread 的大小为 nxk(例如 5x10 的大小 int)。 主线程 迭代大小为 k(即 10)的块 n(即 5)次数。 在主线程中有一个循环运行ning k(这里是5)次。在 k 的每次迭代中, 主线程 对大小为 n 的输入数据的一部分进行一些操作并将它放在 公共缓冲区 中 master 和 worker threads 共享。然后 主线程 通知工作线程数据已放入 公共缓冲区 。
两个工作线程等待来自主线程的信号,如果公共缓冲区 准备好了。 公共缓冲区上的操作在工作线程中分成一半。这意味着一个 工作线程 将在前半部分工作,而另一个 工作线程 将在 公共部分的下半部分工作缓冲区。 一旦 工作线程 从 主线程 获得信号,每个 工作线程 都会执行一些操作在他们的一半数据上,并将其复制到 输出缓冲区 。然后 工作线程 通过设置标志通知 主线程 它们在 公共缓冲区 上的操作已完成值。为 工作线程 创建了一组标志。 master 线程 继续检查是否设置了所有标志,这基本上意味着所有 worker 线程 完成了它们在 common 上的操作buffer 等 master thread 可以将下一个数据块安全地放入 common buffer worker thread's消费。
所以基本上 master 和 worker threads 之间以管道方式进行通信。最后,我在 主线程 中打印 输出缓冲区 。但是我根本没有输出。我已经复制粘贴了我的代码,几乎所有步骤都有完整的注释。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/time.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdbool.h>
#include <string.h>
#define MthNum 1 //Number of Master threads
#define WthNum 2 //Number of Worker threads
#define times 5 // Number of times the iteration (n in the explanation)
#define elNum 10 //Chunk size during each iteration (k in the explanation)
pthread_mutex_t mutex; // mutex variable declaration
pthread_cond_t cond_var; //conditional variarble declaration
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends
//marking the completion
int *commonBuff; //common buffer between master and worker threads
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1
// the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1)
int *commFlags_s;
int counter; // This counter used my master thread to count if all the commFlags[i] that shows
//all the threads finished their work on the common buffer
// static pthread_barrier_t barrier;
// Arguments structure passed to master thread
typedef struct{
int *input; // input buffer
int *output;// output buffer
}master_args;
// Arguments structure passed to worker thread
typedef struct{
int threadId;
int *outBuff;
}worker_args;
void* worker_func(void *arguments);
void *master_func(void *);
int main(int argc,char*argv[]){
int *ipData,*opData;
int i,j;
// allocation of input buffer and initializing to 0
ipData = (int *)malloc(times*elNum*sizeof(int));
memset(ipData,0,times*elNum*sizeof(int));
// allocation of output buffer and initializing to 0
opData = (int *)malloc(times*elNum*sizeof(int));
memset(opData,0,times*elNum*sizeof(int));
pthread_t thread[MthNum];
master_args* args[MthNum];
//creating the single master thread and passing the arguments
for( i=0;i<MthNum;i++){
args[i] = (master_args *)malloc(sizeof(master_args));
args[i]->input= ipData;
args[i]->output= opData;
pthread_create(&thread[i],NULL,master_func,(void *)args[i]);
}
//joining the master thred
for(i=0;i<MthNum;i++){
pthread_join(thread[i],NULL);
}
//printing the output buffer values
for(j =0;j<times;j++ ){
for(i =0;i<elNum;i++){
printf("%d\t",opData[i+j*times]);
}
printf("\n");
}
return 0;
}
//This is the master thread function
void *master_func(void *arguments){
//copying the arguments pointer to local variables
master_args* localMasterArgs = (master_args *)arguments;
int *indataArgs = localMasterArgs->input; //input buffer
int *outdataArgs = localMasterArgs->output; //output buffer
//worker thread declaration
pthread_t Workers[WthNum];
//worker thread arguments declaration
worker_args* wArguments[WthNum];
int i,j;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init (&cond_var, NULL);
counter =0;
commonBuff = (int *)malloc(elNum*sizeof(int));
commFlags = (int *)malloc(WthNum*sizeof(int));
memset(commFlags,0,WthNum*sizeof(int) );
commFlags_s= (int *)malloc(WthNum*sizeof(int));
memset(commFlags_s,0,WthNum*sizeof(int) );
for(i =0;i<WthNum;i++){
wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
wArguments[i]->threadId = i;
wArguments[i]->outBuff = outdataArgs;
pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]);
}
for (i = 0; i < times; i++) {
for (j = 0; j < elNum; j++)
indataArgs[i + j * elNum] = i + j;
while (counter != 0) {
counter = 0;
pthread_mutex_lock(&mutex);
for (j = 0; j < WthNum; j++) {
counter += commFlags_s[j];
}
pthread_mutex_unlock(&mutex);
}
pthread_mutex_lock(&mutex);
memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int));
pthread_mutex_unlock(&mutex);
counter = 1;
while (counter != 0) {
counter = 0;
pthread_mutex_lock(&mutex);
for (j = 0; j < WthNum; j++) {
counter += commFlags[j];
}
pthread_mutex_unlock(&mutex);
}
// printf("master broad cast\n");
pthread_mutex_lock(&mutex);
pthread_cond_broadcast(&cond_var);
//releasing the lock
pthread_mutex_unlock(&mutex);
}
pthread_mutex_lock(&mutex);
completion_flag = false;
pthread_mutex_unlock(&mutex);
for (i = 0; i < WthNum; i++) {
pthread_join(Workers[i], NULL);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_var);
return NULL;
}
void* worker_func(void *arguments){
worker_args* localArgs = (worker_args*)arguments;
//copying the thread ID and the output buffer
int tid = localArgs->threadId;
int *localopBuffer = localArgs->outBuff;
int i,j;
bool local_completion_flag=false;
while(local_completion_flag){
pthread_mutex_lock(&mutex);
commFlags[tid] =0;
commFlags_s[tid] =1;
pthread_cond_wait(&cond_var,&mutex);
commFlags_s[tid] =0;
commFlags[tid] =1;
if (tid == 0) {
for (i = 0; i < (elNum / 2); i++) {
localopBuffer[i] = commonBuff[i] * 5;
}
} else { // Thread ID 1 operating on the other half of the common buffer data and placing on the
// output buffer
for (i = 0; i < (elNum / 2); i++) {
localopBuffer[elNum / 2 + i] = commonBuff[elNum / 2 + i] * 10;
}
}
local_completion_flag=completion_flag;
pthread_mutex_unlock(&mutex);//releasing the lock
}
return NULL;
}
但是我不知道我的实现哪里做错了,因为逻辑上它似乎是正确的。但是我的实现肯定有问题。我花了很长时间尝试不同的方法来修复它,但没有任何效果。很抱歉这么长 post 但我无法确定我可能做错的部分,所以我无法简明 post。因此,如果任何人都可以查看问题和实施,并可以建议需要对 运行 进行哪些更改,这将非常有帮助。感谢您的帮助和协助。
这段代码中有几个错误。
您可以从修复工作线程的创建开始:
wArguments[i] = (worker_args* )malloc(sizeof(worker_args)); wArguments[i]->threadId = i; wArguments[i]->outBuff = outdataArgs; pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments);
您正在初始化 worker_args
结构但不正确 - 将指针传递给数组 (void *)wArguments
而不是传递给您刚刚初始化的数组元素的指针。
pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]);
// ^^^
在启动使用它的值的线程之前初始化计数器:
void *master_func(void *arguments) { /* (...) */ pthread_mutex_init(&mutex, NULL); pthread_cond_init (&cond_var, NULL); counter = WthNum;
启动主线程时,错误地将指针传递给指针:
pthread_create(&thread[i],NULL,master_func,(void *)&args[i]);
请将其更改为:
pthread_create(&thread[i],NULL,master_func,(void *) args[i]);
- 所有对
counter
变量的访问(与任何其他共享内存一样)必须在线程之间同步。
我认为你应该像这样使用基于信号量的生产者-消费者模型