使用多处理并行化两个数据帧之间的比较

Parallelizing comparisons between two dataframes with multiprocessing

我有以下函数,允许我在两个数据帧(dataref)的行和 return 两行的索引之间进行一些比较,如果有一场比赛。

def get_gene(row):

    m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)

    return ref.index[m] if m.any() else None

作为一个耗时的过程(data 中 160 万行 25 分钟,而 ref 中 2 万行),我试图通过并行计算来加快速度。由于 pandas 本身不支持多处理,我使用了我在 SO 上找到的这段代码,它与我的函数 get_gene.

一起工作正常
def _apply_df(args):
    df, func, kwargs = args
    return df.apply(func, **kwargs)


def apply_by_multiprocessing(df, func, **kwargs):

    workers = kwargs.pop('workers')
    pool = multiprocessing.Pool(processes=workers)

    result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)])

    pool.close()

    df = pd.concat(list(result))

    return df

它让我的计算时间减少到 9 分钟。但是,如果我理解正确的话,这段代码只是将我的数据帧 data 分解为 4 个部分,并将每个部分发送到 CPU 的每个核心。因此,每个内核最终会在 400K 行(从 data 分成 4)与 20K 行(ref)之间进行比较。

我实际上想做的是根据其中一列中的值拆分两个数据帧,这样我只计算相同 'group':

的数据帧之间的比较

这会减少计算量。 data 中的每一行只能与 ref 中的约 3K 行匹配,而不是全部 20K 行。

因此,我尝试修改上面的代码,但无法使其正常工作。

def apply_get_gene(df, func, **kwargs):

    reference = pd.read_csv('genomic_positions.csv', index_col=0)
    reference = reference.groupby(['Chr'])

    df = df.groupby(['Chr'])
    chromosome = df.groups.keys()



    workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=workers)


    args_list = [(df.get_group(chrom), func, kwargs, reference.get_group(chrom)) for chrom in chromosome]

    results = pool.map(_apply_df, args_list)

    pool.close()                                                          
    pool.join()                                                           

    return pd.concat(results)


def _apply_df(args):

    df, func, kwarg1, kwarg2 = args

    return df.apply(func, **kwargs)


def get_gene(row, ref):

    m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)

    return ref.index[m] if m.any() else None

我很确定它与 *args**kwargs 通过不同函数传递的方式有关(因为在这种情况下我必须考虑到我想要的将我的拆分 ref 数据帧与拆分的 data 数据帧传递..)。 我认为问题出在函数 _apply_df 中。我以为我理解它的真正作用,但是 df, func, kwargs = args 行仍然困扰着我,我想我没有正确修改它..

感谢所有建议!

看看starmap():

starmap(func, iterable[, chunksize]) Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments.

Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].

这似乎正是您所需要的。

我 post 我为可能偶然发现此问题的读者提出的答案 post:

正如@Michele Tonutti 所指出的,我只需要使用 starmap() 并在这里和那里做一些调整。权衡是它仅应用我的自定义函数 get_gene 和设置 axis=1,但如果需要,可能有一种方法可以使其更灵活。

def Detect_gene(data):

    reference = pd.read_csv('genomic_positions.csv', index_col=0)
    ref = reference.groupby(['Chr'])

    df = data.groupby(['Chr'])
    chromosome = df.groups.keys()

    workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=workers)


    args = [(df.get_group(chrom), ref.get_group(chrom)) 
            for chrom in chromosome]

    results = pool.starmap(apply_get_gene, args)

    pool.close()                                                          
    pool.join()                                                           

    return pd.concat(results)


def apply_get_gene(df, a):

    return df.apply(get_gene, axis=1, ref=a)


def get_gene(row, ref):

    m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)

    return ref.index[m] if m.any() else None

现在使用旧版本的代码需要大约 5 分钟而不是大约 9 分钟,如果没有多处理则需要大约 25 分钟。