使用多处理并行化两个数据帧之间的比较
Parallelizing comparisons between two dataframes with multiprocessing
我有以下函数,允许我在两个数据帧(data
和 ref
)的行和 return 两行的索引之间进行一些比较,如果有一场比赛。
def get_gene(row):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
作为一个耗时的过程(data
中 160 万行 25 分钟,而 ref
中 2 万行),我试图通过并行计算来加快速度。由于 pandas 本身不支持多处理,我使用了我在 SO 上找到的这段代码,它与我的函数 get_gene
.
一起工作正常
def _apply_df(args):
df, func, kwargs = args
return df.apply(func, **kwargs)
def apply_by_multiprocessing(df, func, **kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes=workers)
result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)])
pool.close()
df = pd.concat(list(result))
return df
它让我的计算时间减少到 9 分钟。但是,如果我理解正确的话,这段代码只是将我的数据帧 data
分解为 4 个部分,并将每个部分发送到 CPU 的每个核心。因此,每个内核最终会在 400K 行(从 data
分成 4)与 20K 行(ref
)之间进行比较。
我实际上想做的是根据其中一列中的值拆分两个数据帧,这样我只计算相同 'group':
的数据帧之间的比较
data.get_group(['a'])
对比 ref.get_group(['a'])
data.get_group(['b'])
对比 ref.get_group(['b'])
data.get_group(['c'])
对比 ref.get_group(['c'])
等...
这会减少计算量。 data
中的每一行只能与 ref
中的约 3K 行匹配,而不是全部 20K 行。
因此,我尝试修改上面的代码,但无法使其正常工作。
def apply_get_gene(df, func, **kwargs):
reference = pd.read_csv('genomic_positions.csv', index_col=0)
reference = reference.groupby(['Chr'])
df = df.groupby(['Chr'])
chromosome = df.groups.keys()
workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=workers)
args_list = [(df.get_group(chrom), func, kwargs, reference.get_group(chrom)) for chrom in chromosome]
results = pool.map(_apply_df, args_list)
pool.close()
pool.join()
return pd.concat(results)
def _apply_df(args):
df, func, kwarg1, kwarg2 = args
return df.apply(func, **kwargs)
def get_gene(row, ref):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
我很确定它与 *args
和 **kwargs
通过不同函数传递的方式有关(因为在这种情况下我必须考虑到我想要的将我的拆分 ref
数据帧与拆分的 data
数据帧传递..)。
我认为问题出在函数 _apply_df
中。我以为我理解它的真正作用,但是 df, func, kwargs = args
行仍然困扰着我,我想我没有正确修改它..
感谢所有建议!
看看starmap()
:
starmap(func, iterable[, chunksize])
Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments.
Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
这似乎正是您所需要的。
我 post 我为可能偶然发现此问题的读者提出的答案 post:
正如@Michele Tonutti 所指出的,我只需要使用 starmap()
并在这里和那里做一些调整。权衡是它仅应用我的自定义函数 get_gene
和设置 axis=1
,但如果需要,可能有一种方法可以使其更灵活。
def Detect_gene(data):
reference = pd.read_csv('genomic_positions.csv', index_col=0)
ref = reference.groupby(['Chr'])
df = data.groupby(['Chr'])
chromosome = df.groups.keys()
workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=workers)
args = [(df.get_group(chrom), ref.get_group(chrom))
for chrom in chromosome]
results = pool.starmap(apply_get_gene, args)
pool.close()
pool.join()
return pd.concat(results)
def apply_get_gene(df, a):
return df.apply(get_gene, axis=1, ref=a)
def get_gene(row, ref):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
现在使用旧版本的代码需要大约 5 分钟而不是大约 9 分钟,如果没有多处理则需要大约 25 分钟。
我有以下函数,允许我在两个数据帧(data
和 ref
)的行和 return 两行的索引之间进行一些比较,如果有一场比赛。
def get_gene(row):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
作为一个耗时的过程(data
中 160 万行 25 分钟,而 ref
中 2 万行),我试图通过并行计算来加快速度。由于 pandas 本身不支持多处理,我使用了我在 SO 上找到的这段代码,它与我的函数 get_gene
.
def _apply_df(args):
df, func, kwargs = args
return df.apply(func, **kwargs)
def apply_by_multiprocessing(df, func, **kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes=workers)
result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)])
pool.close()
df = pd.concat(list(result))
return df
它让我的计算时间减少到 9 分钟。但是,如果我理解正确的话,这段代码只是将我的数据帧 data
分解为 4 个部分,并将每个部分发送到 CPU 的每个核心。因此,每个内核最终会在 400K 行(从 data
分成 4)与 20K 行(ref
)之间进行比较。
我实际上想做的是根据其中一列中的值拆分两个数据帧,这样我只计算相同 'group':
的数据帧之间的比较data.get_group(['a'])
对比ref.get_group(['a'])
data.get_group(['b'])
对比ref.get_group(['b'])
data.get_group(['c'])
对比ref.get_group(['c'])
等...
这会减少计算量。 data
中的每一行只能与 ref
中的约 3K 行匹配,而不是全部 20K 行。
因此,我尝试修改上面的代码,但无法使其正常工作。
def apply_get_gene(df, func, **kwargs):
reference = pd.read_csv('genomic_positions.csv', index_col=0)
reference = reference.groupby(['Chr'])
df = df.groupby(['Chr'])
chromosome = df.groups.keys()
workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=workers)
args_list = [(df.get_group(chrom), func, kwargs, reference.get_group(chrom)) for chrom in chromosome]
results = pool.map(_apply_df, args_list)
pool.close()
pool.join()
return pd.concat(results)
def _apply_df(args):
df, func, kwarg1, kwarg2 = args
return df.apply(func, **kwargs)
def get_gene(row, ref):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
我很确定它与 *args
和 **kwargs
通过不同函数传递的方式有关(因为在这种情况下我必须考虑到我想要的将我的拆分 ref
数据帧与拆分的 data
数据帧传递..)。
我认为问题出在函数 _apply_df
中。我以为我理解它的真正作用,但是 df, func, kwargs = args
行仍然困扰着我,我想我没有正确修改它..
感谢所有建议!
看看starmap()
:
starmap(func, iterable[, chunksize]) Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments.
Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
这似乎正是您所需要的。
我 post 我为可能偶然发现此问题的读者提出的答案 post:
正如@Michele Tonutti 所指出的,我只需要使用 starmap()
并在这里和那里做一些调整。权衡是它仅应用我的自定义函数 get_gene
和设置 axis=1
,但如果需要,可能有一种方法可以使其更灵活。
def Detect_gene(data):
reference = pd.read_csv('genomic_positions.csv', index_col=0)
ref = reference.groupby(['Chr'])
df = data.groupby(['Chr'])
chromosome = df.groups.keys()
workers = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=workers)
args = [(df.get_group(chrom), ref.get_group(chrom))
for chrom in chromosome]
results = pool.starmap(apply_get_gene, args)
pool.close()
pool.join()
return pd.concat(results)
def apply_get_gene(df, a):
return df.apply(get_gene, axis=1, ref=a)
def get_gene(row, ref):
m = np.equal(row[0], ref.iloc[:,0].values) & np.greater_equal(row[2], ref.iloc[:,2].values) & np.less_equal(row[3], ref.iloc[:,3].values)
return ref.index[m] if m.any() else None
现在使用旧版本的代码需要大约 5 分钟而不是大约 9 分钟,如果没有多处理则需要大约 25 分钟。