消息队列:执行任务时出现段错误
Message Queue: segmentation fault when executing task
这个 C 作业似乎没有取得任何进展,希望有人能立即发现问题:目标是使用 POSIX 消息队列实现线程池。线程在开始时创建,然后从消息队列中接收任务。线程应接收类型为 task_t 的 16 字节结构,其中包含函数以及指向执行函数的参数的指针。
当线程处理第一个任务时,即当执行函数 foo 时,我会立即收到分段错误,可以在 threadPoolFun 中访问为 task_ptr->func 并且应该是 运行 带有参数 task_ptr->this:
void * threadPoolFun(void * arg) {
task_t * task_ptr;
int size = attr.mq_msgsize; //size = 16
char buf[size];
while (1) {
int ret_val = mq_receive(task_queue, buf, size, NULL);
if (ret_val == -1) {
perror("Error receiving message: ");
}
task_ptr = (task_t *) buf;
if (task_ptr->func == NULL) {
break;
}
task_ptr->func(task_ptr->this);
}
return NULL;
}
分段错误:
Program terminated with signal SIGSEGV, Segmentation fault.
#0 0x000000000040086a in foo (arg=0x7ffdd71fdfd0) at all_in_one.c:37
37 td->retVal = (double) td->tid;
[Current thread is 1 (Thread 0x7fec2d544700 (LWP 9853))]
(gdb) list
32 fooTask_t; // type for this tasks
33
34 //definition of task: write tid to retVal
35 void foo(void * arg) {
36 fooTask_t *td = (fooTask_t *) arg;
37 td->retVal = (double) td->tid;
38 }
39
40 // initialize task
41 void fooInit(fooTask_t * t) {
(gdb) backtrace
#0 0x000000000040086a in foo (arg=0x7ffdd71fdfd0) at all_in_one.c:37
#1 0x0000000000400b8c in threadPoolFun (arg=0x0) at all_in_one.c:118
#2 0x00007fec2f1196ba in start_thread (arg=0x7fec2d544700)
at pthread_create.c:333
#3 0x00007fec2ee4f41d in clone ()
完整代码:
#include <mqueue.h>
#include <pthread.h>
#include <stdio.h>
//number of threads
#define NTH 4
//number of tasks
#define NTASKS 10
void * threadPoolFun(void * arg);
typedef void( * taskfun_t)(void * );
//minimal task data structure: func to be executed with args this, abstraction from actual task
typedef struct minTaskDataStruct {
void * this;
taskfun_t func;
}
task_t;
//data structure for actual task
typedef struct fooDataStruct {
// mandatory entries
void * this; // pointer to this structure
taskfun_t func; // function with signature: void foo(void *)
// data for individual task instances
long tid; // task id
double retVal; // return value
}
fooTask_t; // type for this tasks
//definition of task: write tid to retVal
void foo(void * arg) {
fooTask_t *td = (fooTask_t *) arg;
td->retVal = (double) td->tid;
}
// initialize task
void fooInit(fooTask_t * t) {
t-> this = t; // set this pointer
t-> func = foo; // set task function
}
//data structure for threads
pthread_t th[NTH];
//data structure for task queue attributes
static struct mq_attr attr;
//task queue
mqd_t task_queue;
//create task structs and store them in array td
fooTask_t td[NTASKS];
int main() {
printf("Setting up tasks\n");
for (int i = 0; i < NTASKS; i++) {
fooTask_t task;
task.tid = i;
fooInit(&task);
td[i] = task;
}
// set attributes
attr.mq_flags = 0; /* Flags: 0 or O_NONBLOCK */
attr.mq_maxmsg = 10; /* Max. # of messages on queue */
attr.mq_msgsize = 16; /* Max. message size (bytes) */
printf("Opening task queue\n");
// set up task queue
task_queue = mq_open("/my_task_queue_mq", O_CREAT | O_RDWR, 0700, & attr);
//create threads with default attributes, pass threadpool function
//threads will run as long as func passed to them is not NULL
for (long i = 0; i < NTH; i++) {
printf("Creating thread %ld\n", i);
pthread_create( & th[i], NULL, threadPoolFun, NULL);
}
//send tasks to queue, tasks to be consumed by threads, only send first 16 bytes
for (int i = 0; i < NTASKS; i++) {
printf("Sending task %d\n", i);
mq_send(task_queue, (const char * ) &td[i], sizeof(task_t), 0);
}
//send dummy tasks with func==NULL to terminate threads
for (int i = 0; i < NTASKS; i++) {
printf("Sending dummy task %d\n", i);
task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;
mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}
//verify task execution
int sum1 = 0;
int sum2 = 0;
for (int i = 0; i < NTASKS; i++) {
sum1 += td[i].retVal;
sum2 += i;
}
if (sum1 == sum2) {
printf("Success: Sum1 %d equals Sum2 %d", sum1, sum2);
} else {
printf("Fail: Sum1 %d does not Sum2 %d", sum1, sum2);
}
return 0;
}
//threadPoolFun function definition
void * threadPoolFun(void * arg) {
task_t * task_ptr;
int size = attr.mq_msgsize; //size = 16
char buf[size];
while (1) {
int ret_val = mq_receive(task_queue, buf, size, NULL);
if (ret_val == -1) {
perror("Error receiving message: ");
}
task_ptr = (task_t *) buf;
if (task_ptr->func == NULL) {
break;
}
task_ptr->func(task_ptr->this);
}
return NULL;
}
感谢任何帮助,非常感谢!
劳伦斯
问题是您在堆栈上的对象不再存在后使用地址。你有这个:
for (int i = 0; i < NTASKS; i++) {
fooTask_t task;
task.tid = i;
fooInit(&task);
td[i] = task;
}
&task
就是这样一个指针。这很容易修复,因为您已经有了一个存储任务的地方。这是更好的代码:
for (int i = 0; i < NTASKS; i++) {
fooTask_t task;
task.tid = i;
td[i] = task;
fooInit(&td[i]);
}
第二个发生的地方在这里:
for (int i = 0; i < NTASKS; i++) {
printf("Sending dummy task %d\n", i);
task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;
mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}
由于虚拟任务不需要单独保存在任何地方,你可以移出dummy_task的声明,所以它会在main
的整个执行过程中存活:
task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;
//send dummy tasks with func==NULL to terminate threads
for (int i = 0; i < NTASKS; i++) {
fprintf(stderr, "Sending dummy task %d\n", i);
mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}
在此之后,您的算法仍然存在同步问题,但这是一个单独的问题,我留给您解决。
我将打印从 stdout 更改为 stderr,因为前者是缓冲的,并且在程序崩溃时丢失了。
这个 C 作业似乎没有取得任何进展,希望有人能立即发现问题:目标是使用 POSIX 消息队列实现线程池。线程在开始时创建,然后从消息队列中接收任务。线程应接收类型为 task_t 的 16 字节结构,其中包含函数以及指向执行函数的参数的指针。
当线程处理第一个任务时,即当执行函数 foo 时,我会立即收到分段错误,可以在 threadPoolFun 中访问为 task_ptr->func 并且应该是 运行 带有参数 task_ptr->this:
void * threadPoolFun(void * arg) {
task_t * task_ptr;
int size = attr.mq_msgsize; //size = 16
char buf[size];
while (1) {
int ret_val = mq_receive(task_queue, buf, size, NULL);
if (ret_val == -1) {
perror("Error receiving message: ");
}
task_ptr = (task_t *) buf;
if (task_ptr->func == NULL) {
break;
}
task_ptr->func(task_ptr->this);
}
return NULL;
}
分段错误:
Program terminated with signal SIGSEGV, Segmentation fault. #0 0x000000000040086a in foo (arg=0x7ffdd71fdfd0) at all_in_one.c:37 37 td->retVal = (double) td->tid; [Current thread is 1 (Thread 0x7fec2d544700 (LWP 9853))] (gdb) list 32 fooTask_t; // type for this tasks 33 34 //definition of task: write tid to retVal 35 void foo(void * arg) { 36 fooTask_t *td = (fooTask_t *) arg; 37 td->retVal = (double) td->tid; 38 } 39 40 // initialize task 41 void fooInit(fooTask_t * t) { (gdb) backtrace #0 0x000000000040086a in foo (arg=0x7ffdd71fdfd0) at all_in_one.c:37 #1 0x0000000000400b8c in threadPoolFun (arg=0x0) at all_in_one.c:118 #2 0x00007fec2f1196ba in start_thread (arg=0x7fec2d544700) at pthread_create.c:333 #3 0x00007fec2ee4f41d in clone ()
完整代码:
#include <mqueue.h>
#include <pthread.h>
#include <stdio.h>
//number of threads
#define NTH 4
//number of tasks
#define NTASKS 10
void * threadPoolFun(void * arg);
typedef void( * taskfun_t)(void * );
//minimal task data structure: func to be executed with args this, abstraction from actual task
typedef struct minTaskDataStruct {
void * this;
taskfun_t func;
}
task_t;
//data structure for actual task
typedef struct fooDataStruct {
// mandatory entries
void * this; // pointer to this structure
taskfun_t func; // function with signature: void foo(void *)
// data for individual task instances
long tid; // task id
double retVal; // return value
}
fooTask_t; // type for this tasks
//definition of task: write tid to retVal
void foo(void * arg) {
fooTask_t *td = (fooTask_t *) arg;
td->retVal = (double) td->tid;
}
// initialize task
void fooInit(fooTask_t * t) {
t-> this = t; // set this pointer
t-> func = foo; // set task function
}
//data structure for threads
pthread_t th[NTH];
//data structure for task queue attributes
static struct mq_attr attr;
//task queue
mqd_t task_queue;
//create task structs and store them in array td
fooTask_t td[NTASKS];
int main() {
printf("Setting up tasks\n");
for (int i = 0; i < NTASKS; i++) {
fooTask_t task;
task.tid = i;
fooInit(&task);
td[i] = task;
}
// set attributes
attr.mq_flags = 0; /* Flags: 0 or O_NONBLOCK */
attr.mq_maxmsg = 10; /* Max. # of messages on queue */
attr.mq_msgsize = 16; /* Max. message size (bytes) */
printf("Opening task queue\n");
// set up task queue
task_queue = mq_open("/my_task_queue_mq", O_CREAT | O_RDWR, 0700, & attr);
//create threads with default attributes, pass threadpool function
//threads will run as long as func passed to them is not NULL
for (long i = 0; i < NTH; i++) {
printf("Creating thread %ld\n", i);
pthread_create( & th[i], NULL, threadPoolFun, NULL);
}
//send tasks to queue, tasks to be consumed by threads, only send first 16 bytes
for (int i = 0; i < NTASKS; i++) {
printf("Sending task %d\n", i);
mq_send(task_queue, (const char * ) &td[i], sizeof(task_t), 0);
}
//send dummy tasks with func==NULL to terminate threads
for (int i = 0; i < NTASKS; i++) {
printf("Sending dummy task %d\n", i);
task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;
mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}
//verify task execution
int sum1 = 0;
int sum2 = 0;
for (int i = 0; i < NTASKS; i++) {
sum1 += td[i].retVal;
sum2 += i;
}
if (sum1 == sum2) {
printf("Success: Sum1 %d equals Sum2 %d", sum1, sum2);
} else {
printf("Fail: Sum1 %d does not Sum2 %d", sum1, sum2);
}
return 0;
}
//threadPoolFun function definition
void * threadPoolFun(void * arg) {
task_t * task_ptr;
int size = attr.mq_msgsize; //size = 16
char buf[size];
while (1) {
int ret_val = mq_receive(task_queue, buf, size, NULL);
if (ret_val == -1) {
perror("Error receiving message: ");
}
task_ptr = (task_t *) buf;
if (task_ptr->func == NULL) {
break;
}
task_ptr->func(task_ptr->this);
}
return NULL;
}
感谢任何帮助,非常感谢! 劳伦斯
问题是您在堆栈上的对象不再存在后使用地址。你有这个:
for (int i = 0; i < NTASKS; i++) {
fooTask_t task;
task.tid = i;
fooInit(&task);
td[i] = task;
}
&task
就是这样一个指针。这很容易修复,因为您已经有了一个存储任务的地方。这是更好的代码:
for (int i = 0; i < NTASKS; i++) {
fooTask_t task;
task.tid = i;
td[i] = task;
fooInit(&td[i]);
}
第二个发生的地方在这里:
for (int i = 0; i < NTASKS; i++) {
printf("Sending dummy task %d\n", i);
task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;
mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}
由于虚拟任务不需要单独保存在任何地方,你可以移出dummy_task的声明,所以它会在main
的整个执行过程中存活:
task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;
//send dummy tasks with func==NULL to terminate threads
for (int i = 0; i < NTASKS; i++) {
fprintf(stderr, "Sending dummy task %d\n", i);
mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}
在此之后,您的算法仍然存在同步问题,但这是一个单独的问题,我留给您解决。
我将打印从 stdout 更改为 stderr,因为前者是缓冲的,并且在程序崩溃时丢失了。