大量内存使用,未检测到内存泄漏

Enormous amount of memory usage, no memory leak detected

我在发现程序中的内存泄漏时遇到问题。

top 报告程序 运行s 内存使用量增加。使用 valgrind 分析我的程序时,未报告内存泄漏。

程序由一个"reader"线程和几个"consumer"线程组成。

"reader" 线程将数据加载到多个 char** 指针之一,每个 "consumer" 线程一个。

"consumer" 线程处理其对应的 char* 指针的数据并释放内存。

我已经包含了一些伪代码来描述我的程序正在做什么。我知道提供的代码可能不足以描述问题。如果有帮助,我很乐意包含整个代码项目。

"reader" 线程,为简洁起见进行了压缩

//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
    //'length' number of datapoints a 'consumer' works on at a time
    queue[i] = malloc(length*sizeof(char *));
}

char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    memcpy(datapoint, data->content, data->length);
    queue[qtracker][ltracker] = datapoint;
    qtracker++;
    if (nconsumers == qtracker) { 
        qtracker = 0;
        ltracker++;
        if (length == ltracker) ltracker = 0;
    }
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read

"consumer"线程

//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];

datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

"consumer" 线程正在正确读取数据并按应有的方式处理数据。同样,valgrind 报告没有内存泄漏。当使用 top 或 htop 监视我的进程时,该程序的内存使用量不断增加,直到我的机器开始交换。

编辑

我添加了一个重现错误的完整程序。这不完全是我遇到问题的程序,因为该程序包含其他依赖项。同样,该程序生成 1 "reader" 个线程和 N 个消费者线程。 当 运行在具有数亿行的大型文本文件(例如 DNA 测序文件)上运行时,htop 稳定地显​​示内存使用量不断增加,而 valgrind 显示除了 pthreads 特定的内存泄漏外没有内存泄漏。

再次感谢大家的帮助!!

在任何现代 linux 框中编译并 运行

gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>

只有这个警告应该出现,因为我在这个例子中使用了提取的指针:

<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
     char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>

// Data passed to threads
typedef struct {
    //Input file
    FILE *fp;
    //Number of threads
    int numt;
    //Syncronization data
    pthread_mutex_t mtx;
    pthread_cond_t workcond;
    pthread_cond_t readcond;
    int gowork;
    int goread;
    //Tracks how many threads are done analyzing data
    int doneq;
    /*
      Stores "data queues" (1 queue per thread)
      queue ->       [  [ char**    [ char**    [ char**    [ char**    [ char**
len(queue)=numt          [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n])=maxqueue   [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n][m])=data      ...         ...         ...         ...         ...
                         [char*]     [char*]     [char*]     [char*]     [char*]
                                 ]           ]           ]           ]          ]
                                                                                ]
    */
    char ***queue;
    //Internal thread ID
    int *threadidx;
    //Maximum number of lines to read
    int maxseqs;
    //Maximum number of lines per thread == maxseqs/numthreads
    int maxqueue;
} thread_t;

/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);

/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);

/*
Initializes thread data
*/
int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);

/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);


int main(int argc, char *argv[])
{
    if (argc < 4) {
        fprintf(stderr, "ERROR: Not enough arguments.\n");
        exit(-1);
    }

    FILE *fp = fopen(argv[1], "r");
    if (!fp) {
        fprintf(stderr, "ERROR: Failed to open input file.\n");
        exit(-1);
    }

    int numt = atoi(argv[2]);
    if (!numt) {
        fprintf(stderr, "ERROR: Please specify number of threads.\n");
        fclose(fp);
        exit(-1);
    }

    int maxseqs = atoi(argv[3]);
    if (!maxseqs) {
        fprintf(stderr, "ERROR: Please specify max number of lines.\n");
        fclose(fp);
        exit(-1);
    }

    //Start data struct for threads
    thread_t threaddata;
    if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
        fprintf(stderr, "ERROR: Could not initialize thread data.\n");
        fclose(fp);
        exit(-1);
    }
    fprintf(stderr, "Thread data initialized.\n");


    //return code
    int ret;

    //pthread creation
    pthread_t readerthread;
    pthread_t *consumerpool = NULL;
    consumerpool = malloc((numt)*sizeof(pthread_t));
    if (!consumerpool) {
        fprintf(stderr, "Failed to allocate threads.\n");
        ret = -1;
        goto exit;
    }

    // Initialize and set thread detached attribute
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    //Consumer threads
    int thrc;
    for (int i = 0; i < numt; i++) {
        thrc = pthread_create(consumerpool + i,
                              &attr,
                              consumer,
                              (void *)&threaddata);
        if (thrc) {
            fprintf(stderr, "ERROR: Thread creation.\n");
            ret = -1;
            goto exit;
        }
    }

    //Couple of sleeps to keep track of stuff while running
    sleep(1);

    //Reader thread
    thrc = pthread_create(&readerthread,
                          &attr,
                          reader,
                          (void *)&threaddata);
    if (thrc) {
        fprintf(stderr, "ERROR: Thread creation.\n");
        ret = -1;
        goto exit;
    }


    // Free attribute and wait for the other threads
    pthread_attr_destroy(&attr);

    int jrc;
    jrc = pthread_join(readerthread, NULL);
    if (jrc) {
        fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
    }
    for (int i = 0; i < numt; i++) {
        jrc = pthread_join(*(consumerpool + i), NULL);
        if (jrc) {
            fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
            ret = -1;
            goto exit;
        }
    }
    ret = 0;
    exit:
        threadtkill(&threaddata);
        free(consumerpool);
        fprintf(stderr, "Finished.\n");
        return(ret);
}


void *reader(void *readt)
{
    fprintf(stderr, "Reader thread started.\n");
    thread_t *threaddata = readt;
    int numt = threaddata->numt;
    int maxqueue = threaddata->maxqueue;
    int maxseqs = threaddata->maxseqs;
    FILE *fp = threaddata->fp;

    // Array of queues, one per consumer thread
    char ***queue = threaddata->queue;

    // Number of bytes used to store length of line
    size_t bytes = sizeof(ssize_t);
    // Tracks number of lines loaded so far
    size_t nlines = 0;

    // Tracks to which queue data should be added to
    int qtracker = 0;
    // Tracks to which position in any particular queue, data should be added
    int ltracker = 0;

    // Holds read line
    char *line = NULL;
    ssize_t linelength = 0;
    size_t n;

    // Tracks how much data will be read
    size_t totallength = 0;
    size_t totallines = 0;
    while ( (linelength =  getline(&line, &n, fp)) != -1 ) {
        // enough data is used to hold line contents + line length
        char *data = malloc(bytes + linelength + 1);

        if (!data) {
            fprintf(stderr, "memerr\n");
            continue;
        }
        // move line lenght bytes to data
        memcpy(data, &linelength, bytes);
        //move line bytes to data
        memcpy(data + bytes, line, linelength + 1);

        totallength += linelength;

        // Add newly allocated data to one of numt queues
        queue[qtracker][ltracker] = data;
        qtracker++;
        if (numt == qtracker) {
            // Loop around queue
            qtracker = 0;
            ltracker++;
            // Loop around positions in queue
            if (maxqueue == ltracker) ltracker = 0;
        }
        nlines++;
        // Stop reading thread and start consumer threads
        if (nlines == maxseqs) {
            fprintf(stderr, "%lu lines loaded\n", nlines);
            sleep(3);
            totallines += nlines;
            nlines = 0;
            fprintf(stderr, "Waking up consumers\n");
            pthread_mutex_lock(&(threaddata->mtx));
            //Wake consumer threads
            threaddata->gowork = 1;
            pthread_cond_broadcast(&(threaddata->workcond));
            //Wait for consumer threads to finish
            while ( !threaddata->goread ) {
                pthread_cond_wait(&(threaddata->readcond),
                                  &(threaddata->mtx));
            }
            fprintf(stderr, "Reader has awoken!!!!\n\n");
            sleep(3);
            threaddata->goread = 0;
            pthread_mutex_unlock(&(threaddata->mtx));
        }
    }

    //Add NULL pointers to the end of each queue to indicate reading is done
    pthread_mutex_lock(&(threaddata->mtx));
    for (int i = 0; i < numt; i++) {
        queue[i][ltracker] = NULL;
    }
    // Wake consumers for the last time
    threaddata->gowork = 1;
    pthread_cond_broadcast(&(threaddata->workcond));
    pthread_mutex_unlock(&(threaddata->mtx));

    // Log info
    fprintf(stderr, "%lu characters read.\n", totallength);
    if (line) free(line);
    pthread_exit(NULL);
}


void *consumer(void *consumert)
{
    thread_t *threaddata = consumert;
    // Number of consumer threads
    int numt = threaddata->numt;
    // Max length of queue to extract data from
    int maxqueue = threaddata->maxqueue;

    // Holds data sent by reader thread
    char *data = NULL;
    // Holds the actual line read
    char *line = NULL;
    size_t linelength;
    size_t bytes = sizeof(ssize_t);

    // get queue number for corresponding thread
    int qnum = -1;
    pthread_mutex_lock(&(threaddata->mtx));
    int *tlist = threaddata->threadidx;
    while (qnum == -1) {
        qnum = *tlist;
        *tlist = -1;
        tlist++;
    }
    fprintf(stderr, "Thread got queueID: %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));
    // Any thread works on only one and one queue only
    char **queue = threaddata->queue[qnum];

    //After initializing, wait for reader to start working
    pthread_mutex_lock(&(threaddata->mtx));
    while ( !threaddata->gowork) {
        pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
    }
    fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));

    // Tracks number of characters this thread consumes
    size_t totallength = 0;
    // Tracks from which position in queue data should be taken from
    size_t queuecounter = 1;
    // Get first data point
    data = queue[0];

    while (data != NULL) {
        //get line length
        memcpy(&linelength, data, bytes);

        //get line
        line = data + bytes;

        //Do work
        totallength += linelength;
        free(data);

        //Check for number of sequences analyzed
        if (queuecounter == maxqueue) {
            // Wait for other threads to catchup
            sleep(1);
            queuecounter = 0;
            pthread_mutex_lock(&(threaddata->mtx));
            threaddata->doneq++;
            threaddata->gowork = 0;
            // If this thread is the last one to be done with its queue, wake
            // reader
            if (threaddata->doneq == numt) {
                threaddata->goread = 1;
                pthread_cond_signal(&(threaddata->readcond));
                threaddata->doneq = 0;
            }
            // When done consuming data, wait for reader to load more
            while (!threaddata->gowork) {
                pthread_cond_wait(&(threaddata->workcond),
                                  &(threaddata->mtx));
            }
            pthread_mutex_unlock(&(threaddata->mtx));
        }
        //Get next line
        data = queue[queuecounter];
        queuecounter++;
    }

    // Log and exit
    fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
    pthread_exit(NULL);
}


int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
    threaddata->fp = fp;
    //Determine maximum thread queue length
    threaddata->maxqueue = ceil((float)maxseqs/numt);
    threaddata->maxseqs = threaddata->maxqueue*numt;
    fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
    fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
    threaddata->numt = numt;
    //Allocate data for queues and initilize them
    threaddata->queue = malloc(numt*sizeof(char *));
    threaddata->threadidx = malloc(numt*sizeof(int));
    for (int i = 0; i < numt; i++) {
        threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
        threaddata->threadidx[i] = i;
    }
    //Initialize syncronization data
    pthread_mutex_init(&(threaddata->mtx), NULL);
    pthread_cond_init(&(threaddata->workcond), NULL);
    pthread_cond_init(&(threaddata->readcond), NULL);
    threaddata->gowork = 0;
    threaddata->goread = 0;
    threaddata->doneq = 0;
    return 1;
}


void threadtkill(thread_t *threaddata)
{
    fclose(threaddata->fp);
    for (int i = 0; i < threaddata->numt; i++) {
        free(threaddata->queue[i]);
    }
    free(threaddata->queue);
    free(threaddata->threadidx);
    pthread_mutex_destroy(&(threaddata->mtx));
}

这一行看起来很可疑:

if (length == ltracker) ltracker++;

我通常希望看到:

if (length == ltracker) ltracker = 0; /* wrap */

但是没有完整的上下文,有点模糊。此外,很明显,您正在使用所有这些在生产者和消费者之间制造竞争,这可能比您当前的问题更难调试。

自从你升到三级;您确实认识到您的缓冲区 space 是 O(n^3);而且 free() 很少会缩小您的进程内存。免费通常只允许您回收以前分配的堆;所以你的程序会增长,直到它不再需要向系统请求更多内存,然后保持那个大小。

请注意,以下仅关注您称为 readerconsumer 线程的代码片段,尽管正如评论和其他答案中指出的那样,还有其他潜在来源应该审查问题...

在你的reader thread中:

while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    ...
    // Note: there are no free(datapoint); calls in this loop
}

显然,datapoint 是在此块中创建的,但未在此块中释放。

以下可能是导致内存泄漏的因素:

  • 因为reader thread中的datapoint实例是在块内创建的,它的生命只存在于那些块内。在该地址创建的内存继续由创建它的进程拥有,但在该块之外,指向该内存的指针变量不再存在,因此无法在该块之外释放。而且因为我在该块内没有看到对 free(datapopint) 的调用,所以它永远不会被释放。

  • 结合这个,char *datapoint = malloc(data->legth); 被循环调用,(中间没有调用 free)实际时间在新地址创建新内存,同时覆盖地址引用了它的前身,因此不可能释放所有以前的分配。

  • consumer threaddatapoint 的实例,尽管与 reader thread 中的实例具有相同的符号,但未指向相同的内存 space。因此,即使该变量 释放,它也不会释放存在于 reader thread.

    [=58= 中的 datapoint 的实例]

摘自 consumer thread

的代码
datapoint = dataqueue[datatracker]  //Note no ";" making this code uncompilable
                                    //forcing the conclusion that code posted
                                    //is not code actually used, 
                                    //Also: where is this instance of datapoint
                                    //created ?
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

评论中的每个问题和一般Linux线程信息:
Why doesn't Valgrind detect memory leaks, SO question
passing data between threads question
Creating threads in Linux tutorial
LinuxTtutorial: POSIX Threads

在 malloc() 释放堆上的内存以供程序重用后 se.Calling free() 我的代码没有任何问题 但这并不意味着它回到系统。这其中的原因我还是有点理解不了

Valgrind 没有报告内存泄漏,因为有 none.

在做了dome研究之后,在这里阅读更多关于动态内存分配和登陆的本质:

Will malloc implementations return free-ed memory back to the system?

Memory not freed after calling free()

每次释放后调用 malloc_trim() 足以让系统回收分配的内存。

例如,如果不调用 malloc_trim(),CPU 并且我的程序的内存使用情况如下所示: 每次调用我的 "reader" 线程时(CPU 使用中的第一个峰值)都会分配一些内存。调用 mu "consumer" 线程释放请求的内存,但内存并不总是按照图中的蓝线返回给系统。

在每个 free() 之后使用 malloc_trim(),内存使用情况与我预期的一样: 当 "reader" 线程正在执行时,与进程关联的内存增加。当"consumers"为运行时,内存被释放并归还给OS。