如果我的 CPU 负载另有建议,我应该启动多个线程吗?

should I launch multiple threads if my CPU load suggests otherwise?

我正在用 C++ 编写一个程序,该程序根据参考注释计算 NGS 读取比对。基本上,程序将注释和对齐文件都读入内存,遍历注释,二进制搜索对齐文件中的可能位置,找到该位置后线性搜索围绕该可能位置的帧。

通常我想让这个框架稍微大一点(10000 次对齐),所以我想到了将框架拆分并将其部分放入单独的线程中。

一切都编译并运行,但我的多线程看起来不像预期的那样工作,因为我的 comp 使用一个核心来完成这项工作。谁能帮我弄清楚我在哪里实现线程错误。

https://sourceforge.net/projects/fast-count/?source=directory

#include <iostream>
#include <cstdlib>
#include <vector>
#include <string>
#include <thread>
#include <sstream>
#include <fstream>
#include <math.h> 
#include "api/BamReader.h"

using namespace std;
using namespace BamTools;

int hit_count = 0;

struct bam_headers{

    string chr;
    int start;

};

struct thread_data{

   int thread_id;
   int total_thread;
   int start_gtf;
   int stop_gtf;

};

struct gtf_headers{

    string chr;
    string source;
    string feature;
    string score;
    string strand;
    string frame;
    string annotation;
    int start;
    int end;

};

void process(int* start_holder, int size, int gtf_start, int gtf_stop){

    //threaded counter process

    for (int t = 0; t < size; t++){
        if((start_holder[t] >= gtf_start) && (start_holder[t] <= gtf_stop)){
            hit_count++;
        }
    }

}

vector <string> find_index(vector <vector <bam_headers> > bams){

    //define vector for bam_index to chromosome

    vector <string> compute_holder;
    for (int bam_idx = 0; bam_idx < bams.size();bam_idx++){
        compute_holder.push_back(bams[bam_idx][0].chr);
    }
    return compute_holder;

}

vector <gtf_headers> load_gtf(char* filename){

    //define matrix to memory holding gtf annotations by assoc. header

    vector<gtf_headers> push_matrix;
    gtf_headers holder;
    ifstream gtf_file(filename);
    string line;

    cout << "Loading GTF to memory" << "\n";
    if (gtf_file.is_open()){
        int sub_count = 0;
        string transfer_hold[8];
        while(getline(gtf_file,line)){
            //iterate through file
            istringstream iss(line);
            string token;
            //iterate through line, and tokenize by tab delimitor
            while(getline(iss,token,'\t')){
                if (sub_count == 8){
                    //assign to hold struct, and push to vector
                    holder.chr = transfer_hold[0];
                    holder.source = transfer_hold[1];
                    holder.feature = transfer_hold[2];
                    holder.start = atoi(transfer_hold[3].c_str());
                    holder.end = atoi(transfer_hold[4].c_str());
                    holder.score = transfer_hold[5];
                    holder.strand = transfer_hold[6];
                    holder.frame = transfer_hold[7];
                    holder.annotation = token;
                    push_matrix.push_back(holder);
                    sub_count = 0;
                } else {
                    //temporarily hold tokens
                    transfer_hold[sub_count] = token;
                    ++sub_count;
                }
            }
        }
        cout << "GTF successfully loaded to memory" << "\n";
        gtf_file.close();
        return(push_matrix);
    }else{
        cout << "GTF unsuccessfully loaded to memory. Check path to file, and annotation format. Exiting" << "\n";
        exit(-1);
    }
}

vector <vector <bam_headers>> load_bam(char* filename){

    //parse individual bam file to chromosome bins

    vector <vector <bam_headers> > push_matrix;
    vector <bam_headers> iter_chr;
    int iter_refid = -1;
    bam_headers bam_holder;
    BamReader reader;
    BamAlignment al;
    const vector<RefData>& references = reader.GetReferenceData();

    cout << "Loading " << filename << " to memory" << "\n";
    if (reader.Open(filename)) {    
        while (reader.GetNextAlignmentCore(al)) {
            if (al.IsMapped()){
                //bam file must be sorted by chr. otherwise the lookup will segfault
                if(al.RefID != iter_refid){
                    //check if chr. position has advanced in the bam file, if true, push empty vector
                    iter_refid++;
                    push_matrix.push_back(iter_chr);
                }else{
                    //if chr. position hasn't advanced push to current index in 2d vector
                    bam_holder.chr = references[al.RefID].RefName;
                    bam_holder.start = al.Position;
                    push_matrix.at(iter_refid).push_back(bam_holder);
                }
            }
        }
        reader.Close();
        cout << "Successfully loaded " << filename << " to memory" << "\n";
        return(push_matrix);
    }else{
        cout << "Could not open input BAM file. Exiting." << endl;
        exit(-1);
    }

}

short int find_bin(const string & gtf_chr, const vector <string> mapping){

    //determines which chr. bin the gtf line is associated with 

    int bin_compare = -1;
    for (int i = 0; i < mapping.size(); i++){
        if(gtf_chr == mapping[i]){ 
            bin_compare = i;
        }
    }
    return(bin_compare);

}

int find_frame(gtf_headers gtf_matrix, vector <bam_headers> bam_file_bin){

    //binary search to find alignment index with greater and less than gtf position

    int bin_size = bam_file_bin.size();
    int high_end = bin_size;
    int low_end = 0;
    int binary_i = bin_size / 2;
    int repeat = 0;
    int frame_start;
    bool found = false;

    while (found != true){
        if ((bam_file_bin[binary_i].start >= gtf_matrix.start) && (bam_file_bin[binary_i].start <= gtf_matrix.end)){
            frame_start = binary_i;
            found = true;
        }else{
            if(repeat != binary_i){
                if(bam_file_bin[binary_i].start > gtf_matrix.end){
                    if(repeat != binary_i){
                        repeat = binary_i;
                        high_end = binary_i;
                        binary_i = ((high_end - low_end) / 2) + low_end;
                    }
                }else{
                    if(repeat != binary_i){
                        repeat = binary_i;
                        low_end = binary_i;
                        binary_i = ((high_end - low_end) / 2) + low_end;
                    }
                }
            }else{
                frame_start = low_end; 
                found = true;
            }
        }   
    }
    return(frame_start);
}

vector <int > define_frame(int frame_size, int frame_start, int bam_matrix){

    //define the frame for the search
    vector <int> push_ints;
    push_ints.push_back(frame_start - (frame_size / 2)); 
    push_ints.push_back(frame_start + (frame_size / 2)); 
    if(push_ints[0] < 0){
        push_ints[0] = 0;
        push_ints[1] = frame_size;
        if(push_ints[1] > bam_matrix){
            push_ints[1] = frame_size;
        }
    } 
    if(push_ints[1] > bam_matrix){
        push_ints[1] = bam_matrix;
        push_ints[0] = bam_matrix - (frame_size / 2);
        if(push_ints[0] < 0){
            push_ints[0] = 0;
        }
    }
    return(push_ints);

}

void thread_handler(int nthread, vector <int> frame, vector <bam_headers> bam_matrix, gtf_headers gtf_record){

    int thread_divide = frame[1]-frame[0];//frame_size / nthread;
    int thread_remain = (frame[1]-frame[0]) % nthread;
    int* start_holder = new int[thread_divide];

    for(int i = 0; i < nthread; i++){
        if (i < nthread - 1){
            for (int frame_index = 0; frame_index < thread_divide; frame_index++){
                 start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;         
            } 
            frame[0] = frame[0] + thread_divide;
            thread first(process, start_holder,thread_divide,gtf_record.start,gtf_record.end);
            first.join();
        }else{
            for (int frame_index = 0; frame_index < thread_divide + thread_remain; frame_index++){
                 start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;    
            } 
            thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
            last.join();
        }
    }

}



int main (int argc, char *argv[])
{

    // usage
    // ./count threads frame_size gtf_file files

    //define matrix to memory holding gtf annotations by assoc. header
    vector <gtf_headers> gtf_matrix = load_gtf(argv[3]);

    //load bam, perform counts
    for(int i = 4;i < argc;i++){

        //iterate through filenames in argv, define matrix to memory holding bam alignments chr and bp position
        vector <vector <bam_headers> > bam_matrix = load_bam(argv[i]);

        //map chromosome to bam matrix index
        vector <string> index_mapping = find_index(bam_matrix);

        //iterate through gtf matrix, find corresponding bins for chr, set search frames, and count
        for(int gtf_i = 0; gtf_i < gtf_i < gtf_matrix.size();gtf_i++){ //gtf_i < gtf_matrix.size()

            hit_count = 0;
            //find corresponding bins for gtf chr
            short int bin_compare = find_bin(gtf_matrix[gtf_i].chr,index_mapping);

            if(bin_compare != -1){

                //find start of search frame
                int frame_start = find_frame(gtf_matrix[gtf_i], bam_matrix[bin_compare]);

                //get up lower bounds of search frame;
                vector <int> full_frame = define_frame(atoi(argv[2]),frame_start,bam_matrix[bin_compare].size());

                //create c array of bam positional data for the frame, and post to thread process
                thread_handler(atoi(argv[1]),full_frame,bam_matrix[bin_compare],gtf_matrix[gtf_i]);

            }

            //counts displayed in STOUT
            cout << gtf_matrix[gtf_i].chr << "\t" << gtf_matrix[gtf_i].source << "\t" << gtf_matrix[gtf_i].feature << "\t" << gtf_matrix[gtf_i].start << "\t" << gtf_matrix[gtf_i].end << "\t" << gtf_matrix[gtf_i].score << "\t" << gtf_matrix[gtf_i].strand << "\t" << gtf_matrix[gtf_i].frame << "\t" << gtf_matrix[gtf_i].annotation << "\t" << hit_count << "\n";

        }
    }
}

你的问题的答案很简单:

thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
last.join();

在这里,父任务创建了一个新线程,并且...立即等待线程完成。这就是 join() 所做的,它等待线程终止。

因此,您的代码会启动一个新线程,并立即等待它完成,然后再做任何其他事情,例如启动下一个线程。

你需要重写thread_handler()来实例化所有的std::thread实例,然后在实例化所有实例之后,在每个实例上调用join(),等待所有实例完成。

典型的方法是使用 std::thread 的默认构造函数预先创建所有线程实例的 std::vector,然后遍历它们以初始化每个实例,然后再次遍历它们,调用 join() 每一个。