大量内存使用,未检测到内存泄漏
Enormous amount of memory usage, no memory leak detected
我在发现程序中的内存泄漏时遇到问题。
top 报告程序 运行s 内存使用量增加。使用 valgrind 分析我的程序时,未报告内存泄漏。
程序由一个"reader"线程和几个"consumer"线程组成。
"reader" 线程将数据加载到多个 char** 指针之一,每个 "consumer" 线程一个。
"consumer" 线程处理其对应的 char* 指针的数据并释放内存。
我已经包含了一些伪代码来描述我的程序正在做什么。我知道提供的代码可能不足以描述问题。如果有帮助,我很乐意包含整个代码项目。
"reader" 线程,为简洁起见进行了压缩
//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
//'length' number of datapoints a 'consumer' works on at a time
queue[i] = malloc(length*sizeof(char *));
}
char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
char *datapoint = malloc(data->legth);
memcpy(datapoint, data->content, data->length);
queue[qtracker][ltracker] = datapoint;
qtracker++;
if (nconsumers == qtracker) {
qtracker = 0;
ltracker++;
if (length == ltracker) ltracker = 0;
}
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read
"consumer"线程
//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];
datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
//Do work on data
free(datapoint);
datapoint = dataqueue[datatracker];
datatracker++;
//More synchronization code
}
"consumer" 线程正在正确读取数据并按应有的方式处理数据。同样,valgrind 报告没有内存泄漏。当使用 top 或 htop 监视我的进程时,该程序的内存使用量不断增加,直到我的机器开始交换。
编辑
我添加了一个重现错误的完整程序。这不完全是我遇到问题的程序,因为该程序包含其他依赖项。同样,该程序生成 1 "reader" 个线程和 N 个消费者线程。
当 运行在具有数亿行的大型文本文件(例如 DNA 测序文件)上运行时,htop 稳定地显示内存使用量不断增加,而 valgrind 显示除了 pthreads 特定的内存泄漏外没有内存泄漏。
再次感谢大家的帮助!!
在任何现代 linux 框中编译并 运行
gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>
只有这个警告应该出现,因为我在这个例子中使用了提取的指针:
<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
// Data passed to threads
typedef struct {
//Input file
FILE *fp;
//Number of threads
int numt;
//Syncronization data
pthread_mutex_t mtx;
pthread_cond_t workcond;
pthread_cond_t readcond;
int gowork;
int goread;
//Tracks how many threads are done analyzing data
int doneq;
/*
Stores "data queues" (1 queue per thread)
queue -> [ [ char** [ char** [ char** [ char** [ char**
len(queue)=numt [char*] [char*] [char*] [char*] [char*]
len(queue[n])=maxqueue [char*] [char*] [char*] [char*] [char*]
len(queue[n][m])=data ... ... ... ... ...
[char*] [char*] [char*] [char*] [char*]
] ] ] ] ]
]
*/
char ***queue;
//Internal thread ID
int *threadidx;
//Maximum number of lines to read
int maxseqs;
//Maximum number of lines per thread == maxseqs/numthreads
int maxqueue;
} thread_t;
/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);
/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);
/*
Initializes thread data
*/
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);
/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);
int main(int argc, char *argv[])
{
if (argc < 4) {
fprintf(stderr, "ERROR: Not enough arguments.\n");
exit(-1);
}
FILE *fp = fopen(argv[1], "r");
if (!fp) {
fprintf(stderr, "ERROR: Failed to open input file.\n");
exit(-1);
}
int numt = atoi(argv[2]);
if (!numt) {
fprintf(stderr, "ERROR: Please specify number of threads.\n");
fclose(fp);
exit(-1);
}
int maxseqs = atoi(argv[3]);
if (!maxseqs) {
fprintf(stderr, "ERROR: Please specify max number of lines.\n");
fclose(fp);
exit(-1);
}
//Start data struct for threads
thread_t threaddata;
if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
fprintf(stderr, "ERROR: Could not initialize thread data.\n");
fclose(fp);
exit(-1);
}
fprintf(stderr, "Thread data initialized.\n");
//return code
int ret;
//pthread creation
pthread_t readerthread;
pthread_t *consumerpool = NULL;
consumerpool = malloc((numt)*sizeof(pthread_t));
if (!consumerpool) {
fprintf(stderr, "Failed to allocate threads.\n");
ret = -1;
goto exit;
}
// Initialize and set thread detached attribute
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
//Consumer threads
int thrc;
for (int i = 0; i < numt; i++) {
thrc = pthread_create(consumerpool + i,
&attr,
consumer,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
}
//Couple of sleeps to keep track of stuff while running
sleep(1);
//Reader thread
thrc = pthread_create(&readerthread,
&attr,
reader,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
// Free attribute and wait for the other threads
pthread_attr_destroy(&attr);
int jrc;
jrc = pthread_join(readerthread, NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
}
for (int i = 0; i < numt; i++) {
jrc = pthread_join(*(consumerpool + i), NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
ret = -1;
goto exit;
}
}
ret = 0;
exit:
threadtkill(&threaddata);
free(consumerpool);
fprintf(stderr, "Finished.\n");
return(ret);
}
void *reader(void *readt)
{
fprintf(stderr, "Reader thread started.\n");
thread_t *threaddata = readt;
int numt = threaddata->numt;
int maxqueue = threaddata->maxqueue;
int maxseqs = threaddata->maxseqs;
FILE *fp = threaddata->fp;
// Array of queues, one per consumer thread
char ***queue = threaddata->queue;
// Number of bytes used to store length of line
size_t bytes = sizeof(ssize_t);
// Tracks number of lines loaded so far
size_t nlines = 0;
// Tracks to which queue data should be added to
int qtracker = 0;
// Tracks to which position in any particular queue, data should be added
int ltracker = 0;
// Holds read line
char *line = NULL;
ssize_t linelength = 0;
size_t n;
// Tracks how much data will be read
size_t totallength = 0;
size_t totallines = 0;
while ( (linelength = getline(&line, &n, fp)) != -1 ) {
// enough data is used to hold line contents + line length
char *data = malloc(bytes + linelength + 1);
if (!data) {
fprintf(stderr, "memerr\n");
continue;
}
// move line lenght bytes to data
memcpy(data, &linelength, bytes);
//move line bytes to data
memcpy(data + bytes, line, linelength + 1);
totallength += linelength;
// Add newly allocated data to one of numt queues
queue[qtracker][ltracker] = data;
qtracker++;
if (numt == qtracker) {
// Loop around queue
qtracker = 0;
ltracker++;
// Loop around positions in queue
if (maxqueue == ltracker) ltracker = 0;
}
nlines++;
// Stop reading thread and start consumer threads
if (nlines == maxseqs) {
fprintf(stderr, "%lu lines loaded\n", nlines);
sleep(3);
totallines += nlines;
nlines = 0;
fprintf(stderr, "Waking up consumers\n");
pthread_mutex_lock(&(threaddata->mtx));
//Wake consumer threads
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
//Wait for consumer threads to finish
while ( !threaddata->goread ) {
pthread_cond_wait(&(threaddata->readcond),
&(threaddata->mtx));
}
fprintf(stderr, "Reader has awoken!!!!\n\n");
sleep(3);
threaddata->goread = 0;
pthread_mutex_unlock(&(threaddata->mtx));
}
}
//Add NULL pointers to the end of each queue to indicate reading is done
pthread_mutex_lock(&(threaddata->mtx));
for (int i = 0; i < numt; i++) {
queue[i][ltracker] = NULL;
}
// Wake consumers for the last time
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
pthread_mutex_unlock(&(threaddata->mtx));
// Log info
fprintf(stderr, "%lu characters read.\n", totallength);
if (line) free(line);
pthread_exit(NULL);
}
void *consumer(void *consumert)
{
thread_t *threaddata = consumert;
// Number of consumer threads
int numt = threaddata->numt;
// Max length of queue to extract data from
int maxqueue = threaddata->maxqueue;
// Holds data sent by reader thread
char *data = NULL;
// Holds the actual line read
char *line = NULL;
size_t linelength;
size_t bytes = sizeof(ssize_t);
// get queue number for corresponding thread
int qnum = -1;
pthread_mutex_lock(&(threaddata->mtx));
int *tlist = threaddata->threadidx;
while (qnum == -1) {
qnum = *tlist;
*tlist = -1;
tlist++;
}
fprintf(stderr, "Thread got queueID: %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Any thread works on only one and one queue only
char **queue = threaddata->queue[qnum];
//After initializing, wait for reader to start working
pthread_mutex_lock(&(threaddata->mtx));
while ( !threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
}
fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Tracks number of characters this thread consumes
size_t totallength = 0;
// Tracks from which position in queue data should be taken from
size_t queuecounter = 1;
// Get first data point
data = queue[0];
while (data != NULL) {
//get line length
memcpy(&linelength, data, bytes);
//get line
line = data + bytes;
//Do work
totallength += linelength;
free(data);
//Check for number of sequences analyzed
if (queuecounter == maxqueue) {
// Wait for other threads to catchup
sleep(1);
queuecounter = 0;
pthread_mutex_lock(&(threaddata->mtx));
threaddata->doneq++;
threaddata->gowork = 0;
// If this thread is the last one to be done with its queue, wake
// reader
if (threaddata->doneq == numt) {
threaddata->goread = 1;
pthread_cond_signal(&(threaddata->readcond));
threaddata->doneq = 0;
}
// When done consuming data, wait for reader to load more
while (!threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond),
&(threaddata->mtx));
}
pthread_mutex_unlock(&(threaddata->mtx));
}
//Get next line
data = queue[queuecounter];
queuecounter++;
}
// Log and exit
fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
pthread_exit(NULL);
}
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
threaddata->fp = fp;
//Determine maximum thread queue length
threaddata->maxqueue = ceil((float)maxseqs/numt);
threaddata->maxseqs = threaddata->maxqueue*numt;
fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
threaddata->numt = numt;
//Allocate data for queues and initilize them
threaddata->queue = malloc(numt*sizeof(char *));
threaddata->threadidx = malloc(numt*sizeof(int));
for (int i = 0; i < numt; i++) {
threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
threaddata->threadidx[i] = i;
}
//Initialize syncronization data
pthread_mutex_init(&(threaddata->mtx), NULL);
pthread_cond_init(&(threaddata->workcond), NULL);
pthread_cond_init(&(threaddata->readcond), NULL);
threaddata->gowork = 0;
threaddata->goread = 0;
threaddata->doneq = 0;
return 1;
}
void threadtkill(thread_t *threaddata)
{
fclose(threaddata->fp);
for (int i = 0; i < threaddata->numt; i++) {
free(threaddata->queue[i]);
}
free(threaddata->queue);
free(threaddata->threadidx);
pthread_mutex_destroy(&(threaddata->mtx));
}
这一行看起来很可疑:
if (length == ltracker) ltracker++;
我通常希望看到:
if (length == ltracker) ltracker = 0; /* wrap */
但是没有完整的上下文,有点模糊。此外,很明显,您正在使用所有这些在生产者和消费者之间制造竞争,这可能比您当前的问题更难调试。
自从你升到三级;您确实认识到您的缓冲区 space 是 O(n^3);而且 free() 很少会缩小您的进程内存。免费通常只允许您回收以前分配的堆;所以你的程序会增长,直到它不再需要向系统请求更多内存,然后保持那个大小。
请注意,以下仅关注您称为 reader
和 consumer
线程的代码片段,尽管正如评论和其他答案中指出的那样,还有其他潜在来源应该审查问题...
在你的reader thread
中:
while (loaddata(data) >= 0) {
char *datapoint = malloc(data->legth);
...
// Note: there are no free(datapoint); calls in this loop
}
显然,datapoint
是在此块中创建的,但未在此块中释放。
以下可能是导致内存泄漏的因素:
因为reader thread
中的datapoint
实例是在块内创建的,它的生命只存在于那些块内。在该地址创建的内存继续由创建它的进程拥有,但在该块之外,指向该内存的指针变量不再存在,因此无法在该块之外释放。而且因为我在该块内没有看到对 free(datapopint)
的调用,所以它永远不会被释放。
结合这个,char *datapoint = malloc(data->legth);
被循环调用,(中间没有调用 free
)实际时间在新地址创建新内存,同时覆盖地址引用了它的前身,因此不可能释放所有以前的分配。
consumer thread
中 datapoint
的实例,尽管与 reader thread
中的实例具有相同的符号,但未指向相同的内存 space。因此,即使该变量 被 释放,它也不会释放存在于 reader thread
.
[=58= 中的 datapoint
的实例]
摘自 consumer thread
的代码
datapoint = dataqueue[datatracker] //Note no ";" making this code uncompilable
//forcing the conclusion that code posted
//is not code actually used,
//Also: where is this instance of datapoint
//created ?
datatracker++;
while (datapoint != NULL) {
//Do work on data
free(datapoint);
datapoint = dataqueue[datatracker];
datatracker++;
//More synchronization code
}
评论中的每个问题和一般Linux线程信息:
Why doesn't Valgrind detect memory leaks, SO question
passing data between threads question
Creating threads in Linux tutorial
LinuxTtutorial: POSIX Threads
在 malloc() 释放堆上的内存以供程序重用后 se.Calling free() 我的代码没有任何问题 但这并不意味着它回到系统。这其中的原因我还是有点理解不了
Valgrind 没有报告内存泄漏,因为有 none.
在做了dome研究之后,在这里阅读更多关于动态内存分配和登陆的本质:
Will malloc implementations return free-ed memory back to the system?
Memory not freed after calling free()
每次释放后调用 malloc_trim() 足以让系统回收分配的内存。
例如,如果不调用 malloc_trim(),CPU 并且我的程序的内存使用情况如下所示:
每次调用我的 "reader" 线程时(CPU 使用中的第一个峰值)都会分配一些内存。调用 mu "consumer" 线程释放请求的内存,但内存并不总是按照图中的蓝线返回给系统。
在每个 free() 之后使用 malloc_trim(),内存使用情况与我预期的一样:
当 "reader" 线程正在执行时,与进程关联的内存增加。当"consumers"为运行时,内存被释放并归还给OS。
我在发现程序中的内存泄漏时遇到问题。
top 报告程序 运行s 内存使用量增加。使用 valgrind 分析我的程序时,未报告内存泄漏。
程序由一个"reader"线程和几个"consumer"线程组成。
"reader" 线程将数据加载到多个 char** 指针之一,每个 "consumer" 线程一个。
"consumer" 线程处理其对应的 char* 指针的数据并释放内存。
我已经包含了一些伪代码来描述我的程序正在做什么。我知道提供的代码可能不足以描述问题。如果有帮助,我很乐意包含整个代码项目。
"reader" 线程,为简洁起见进行了压缩
//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
//'length' number of datapoints a 'consumer' works on at a time
queue[i] = malloc(length*sizeof(char *));
}
char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
char *datapoint = malloc(data->legth);
memcpy(datapoint, data->content, data->length);
queue[qtracker][ltracker] = datapoint;
qtracker++;
if (nconsumers == qtracker) {
qtracker = 0;
ltracker++;
if (length == ltracker) ltracker = 0;
}
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read
"consumer"线程
//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];
datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
//Do work on data
free(datapoint);
datapoint = dataqueue[datatracker];
datatracker++;
//More synchronization code
}
"consumer" 线程正在正确读取数据并按应有的方式处理数据。同样,valgrind 报告没有内存泄漏。当使用 top 或 htop 监视我的进程时,该程序的内存使用量不断增加,直到我的机器开始交换。
编辑
我添加了一个重现错误的完整程序。这不完全是我遇到问题的程序,因为该程序包含其他依赖项。同样,该程序生成 1 "reader" 个线程和 N 个消费者线程。 当 运行在具有数亿行的大型文本文件(例如 DNA 测序文件)上运行时,htop 稳定地显示内存使用量不断增加,而 valgrind 显示除了 pthreads 特定的内存泄漏外没有内存泄漏。
再次感谢大家的帮助!!
在任何现代 linux 框中编译并 运行
gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>
只有这个警告应该出现,因为我在这个例子中使用了提取的指针:
<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
// Data passed to threads
typedef struct {
//Input file
FILE *fp;
//Number of threads
int numt;
//Syncronization data
pthread_mutex_t mtx;
pthread_cond_t workcond;
pthread_cond_t readcond;
int gowork;
int goread;
//Tracks how many threads are done analyzing data
int doneq;
/*
Stores "data queues" (1 queue per thread)
queue -> [ [ char** [ char** [ char** [ char** [ char**
len(queue)=numt [char*] [char*] [char*] [char*] [char*]
len(queue[n])=maxqueue [char*] [char*] [char*] [char*] [char*]
len(queue[n][m])=data ... ... ... ... ...
[char*] [char*] [char*] [char*] [char*]
] ] ] ] ]
]
*/
char ***queue;
//Internal thread ID
int *threadidx;
//Maximum number of lines to read
int maxseqs;
//Maximum number of lines per thread == maxseqs/numthreads
int maxqueue;
} thread_t;
/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);
/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);
/*
Initializes thread data
*/
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);
/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);
int main(int argc, char *argv[])
{
if (argc < 4) {
fprintf(stderr, "ERROR: Not enough arguments.\n");
exit(-1);
}
FILE *fp = fopen(argv[1], "r");
if (!fp) {
fprintf(stderr, "ERROR: Failed to open input file.\n");
exit(-1);
}
int numt = atoi(argv[2]);
if (!numt) {
fprintf(stderr, "ERROR: Please specify number of threads.\n");
fclose(fp);
exit(-1);
}
int maxseqs = atoi(argv[3]);
if (!maxseqs) {
fprintf(stderr, "ERROR: Please specify max number of lines.\n");
fclose(fp);
exit(-1);
}
//Start data struct for threads
thread_t threaddata;
if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
fprintf(stderr, "ERROR: Could not initialize thread data.\n");
fclose(fp);
exit(-1);
}
fprintf(stderr, "Thread data initialized.\n");
//return code
int ret;
//pthread creation
pthread_t readerthread;
pthread_t *consumerpool = NULL;
consumerpool = malloc((numt)*sizeof(pthread_t));
if (!consumerpool) {
fprintf(stderr, "Failed to allocate threads.\n");
ret = -1;
goto exit;
}
// Initialize and set thread detached attribute
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
//Consumer threads
int thrc;
for (int i = 0; i < numt; i++) {
thrc = pthread_create(consumerpool + i,
&attr,
consumer,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
}
//Couple of sleeps to keep track of stuff while running
sleep(1);
//Reader thread
thrc = pthread_create(&readerthread,
&attr,
reader,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
// Free attribute and wait for the other threads
pthread_attr_destroy(&attr);
int jrc;
jrc = pthread_join(readerthread, NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
}
for (int i = 0; i < numt; i++) {
jrc = pthread_join(*(consumerpool + i), NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
ret = -1;
goto exit;
}
}
ret = 0;
exit:
threadtkill(&threaddata);
free(consumerpool);
fprintf(stderr, "Finished.\n");
return(ret);
}
void *reader(void *readt)
{
fprintf(stderr, "Reader thread started.\n");
thread_t *threaddata = readt;
int numt = threaddata->numt;
int maxqueue = threaddata->maxqueue;
int maxseqs = threaddata->maxseqs;
FILE *fp = threaddata->fp;
// Array of queues, one per consumer thread
char ***queue = threaddata->queue;
// Number of bytes used to store length of line
size_t bytes = sizeof(ssize_t);
// Tracks number of lines loaded so far
size_t nlines = 0;
// Tracks to which queue data should be added to
int qtracker = 0;
// Tracks to which position in any particular queue, data should be added
int ltracker = 0;
// Holds read line
char *line = NULL;
ssize_t linelength = 0;
size_t n;
// Tracks how much data will be read
size_t totallength = 0;
size_t totallines = 0;
while ( (linelength = getline(&line, &n, fp)) != -1 ) {
// enough data is used to hold line contents + line length
char *data = malloc(bytes + linelength + 1);
if (!data) {
fprintf(stderr, "memerr\n");
continue;
}
// move line lenght bytes to data
memcpy(data, &linelength, bytes);
//move line bytes to data
memcpy(data + bytes, line, linelength + 1);
totallength += linelength;
// Add newly allocated data to one of numt queues
queue[qtracker][ltracker] = data;
qtracker++;
if (numt == qtracker) {
// Loop around queue
qtracker = 0;
ltracker++;
// Loop around positions in queue
if (maxqueue == ltracker) ltracker = 0;
}
nlines++;
// Stop reading thread and start consumer threads
if (nlines == maxseqs) {
fprintf(stderr, "%lu lines loaded\n", nlines);
sleep(3);
totallines += nlines;
nlines = 0;
fprintf(stderr, "Waking up consumers\n");
pthread_mutex_lock(&(threaddata->mtx));
//Wake consumer threads
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
//Wait for consumer threads to finish
while ( !threaddata->goread ) {
pthread_cond_wait(&(threaddata->readcond),
&(threaddata->mtx));
}
fprintf(stderr, "Reader has awoken!!!!\n\n");
sleep(3);
threaddata->goread = 0;
pthread_mutex_unlock(&(threaddata->mtx));
}
}
//Add NULL pointers to the end of each queue to indicate reading is done
pthread_mutex_lock(&(threaddata->mtx));
for (int i = 0; i < numt; i++) {
queue[i][ltracker] = NULL;
}
// Wake consumers for the last time
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
pthread_mutex_unlock(&(threaddata->mtx));
// Log info
fprintf(stderr, "%lu characters read.\n", totallength);
if (line) free(line);
pthread_exit(NULL);
}
void *consumer(void *consumert)
{
thread_t *threaddata = consumert;
// Number of consumer threads
int numt = threaddata->numt;
// Max length of queue to extract data from
int maxqueue = threaddata->maxqueue;
// Holds data sent by reader thread
char *data = NULL;
// Holds the actual line read
char *line = NULL;
size_t linelength;
size_t bytes = sizeof(ssize_t);
// get queue number for corresponding thread
int qnum = -1;
pthread_mutex_lock(&(threaddata->mtx));
int *tlist = threaddata->threadidx;
while (qnum == -1) {
qnum = *tlist;
*tlist = -1;
tlist++;
}
fprintf(stderr, "Thread got queueID: %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Any thread works on only one and one queue only
char **queue = threaddata->queue[qnum];
//After initializing, wait for reader to start working
pthread_mutex_lock(&(threaddata->mtx));
while ( !threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
}
fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Tracks number of characters this thread consumes
size_t totallength = 0;
// Tracks from which position in queue data should be taken from
size_t queuecounter = 1;
// Get first data point
data = queue[0];
while (data != NULL) {
//get line length
memcpy(&linelength, data, bytes);
//get line
line = data + bytes;
//Do work
totallength += linelength;
free(data);
//Check for number of sequences analyzed
if (queuecounter == maxqueue) {
// Wait for other threads to catchup
sleep(1);
queuecounter = 0;
pthread_mutex_lock(&(threaddata->mtx));
threaddata->doneq++;
threaddata->gowork = 0;
// If this thread is the last one to be done with its queue, wake
// reader
if (threaddata->doneq == numt) {
threaddata->goread = 1;
pthread_cond_signal(&(threaddata->readcond));
threaddata->doneq = 0;
}
// When done consuming data, wait for reader to load more
while (!threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond),
&(threaddata->mtx));
}
pthread_mutex_unlock(&(threaddata->mtx));
}
//Get next line
data = queue[queuecounter];
queuecounter++;
}
// Log and exit
fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
pthread_exit(NULL);
}
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
threaddata->fp = fp;
//Determine maximum thread queue length
threaddata->maxqueue = ceil((float)maxseqs/numt);
threaddata->maxseqs = threaddata->maxqueue*numt;
fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
threaddata->numt = numt;
//Allocate data for queues and initilize them
threaddata->queue = malloc(numt*sizeof(char *));
threaddata->threadidx = malloc(numt*sizeof(int));
for (int i = 0; i < numt; i++) {
threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
threaddata->threadidx[i] = i;
}
//Initialize syncronization data
pthread_mutex_init(&(threaddata->mtx), NULL);
pthread_cond_init(&(threaddata->workcond), NULL);
pthread_cond_init(&(threaddata->readcond), NULL);
threaddata->gowork = 0;
threaddata->goread = 0;
threaddata->doneq = 0;
return 1;
}
void threadtkill(thread_t *threaddata)
{
fclose(threaddata->fp);
for (int i = 0; i < threaddata->numt; i++) {
free(threaddata->queue[i]);
}
free(threaddata->queue);
free(threaddata->threadidx);
pthread_mutex_destroy(&(threaddata->mtx));
}
这一行看起来很可疑:
if (length == ltracker) ltracker++;
我通常希望看到:
if (length == ltracker) ltracker = 0; /* wrap */
但是没有完整的上下文,有点模糊。此外,很明显,您正在使用所有这些在生产者和消费者之间制造竞争,这可能比您当前的问题更难调试。
自从你升到三级;您确实认识到您的缓冲区 space 是 O(n^3);而且 free() 很少会缩小您的进程内存。免费通常只允许您回收以前分配的堆;所以你的程序会增长,直到它不再需要向系统请求更多内存,然后保持那个大小。
请注意,以下仅关注您称为 reader
和 consumer
线程的代码片段,尽管正如评论和其他答案中指出的那样,还有其他潜在来源应该审查问题...
在你的reader thread
中:
while (loaddata(data) >= 0) {
char *datapoint = malloc(data->legth);
...
// Note: there are no free(datapoint); calls in this loop
}
显然,datapoint
是在此块中创建的,但未在此块中释放。
以下可能是导致内存泄漏的因素:
因为
reader thread
中的datapoint
实例是在块内创建的,它的生命只存在于那些块内。在该地址创建的内存继续由创建它的进程拥有,但在该块之外,指向该内存的指针变量不再存在,因此无法在该块之外释放。而且因为我在该块内没有看到对free(datapopint)
的调用,所以它永远不会被释放。结合这个,
char *datapoint = malloc(data->legth);
被循环调用,(中间没有调用free
)实际时间在新地址创建新内存,同时覆盖地址引用了它的前身,因此不可能释放所有以前的分配。
[=58= 中的consumer thread
中datapoint
的实例,尽管与reader thread
中的实例具有相同的符号,但未指向相同的内存 space。因此,即使该变量 被 释放,它也不会释放存在于reader thread
.datapoint
的实例]
摘自 consumer thread
datapoint = dataqueue[datatracker] //Note no ";" making this code uncompilable
//forcing the conclusion that code posted
//is not code actually used,
//Also: where is this instance of datapoint
//created ?
datatracker++;
while (datapoint != NULL) {
//Do work on data
free(datapoint);
datapoint = dataqueue[datatracker];
datatracker++;
//More synchronization code
}
评论中的每个问题和一般Linux线程信息:
Why doesn't Valgrind detect memory leaks, SO question
passing data between threads question
Creating threads in Linux tutorial
LinuxTtutorial: POSIX Threads
在 malloc() 释放堆上的内存以供程序重用后 se.Calling free() 我的代码没有任何问题 但这并不意味着它回到系统。这其中的原因我还是有点理解不了
Valgrind 没有报告内存泄漏,因为有 none.
在做了dome研究之后,在这里阅读更多关于动态内存分配和登陆的本质:
Will malloc implementations return free-ed memory back to the system?
Memory not freed after calling free()
每次释放后调用 malloc_trim() 足以让系统回收分配的内存。
例如,如果不调用 malloc_trim(),CPU 并且我的程序的内存使用情况如下所示:
在每个 free() 之后使用 malloc_trim(),内存使用情况与我预期的一样: