python 使用 map 的多进程,但有一个子进程 运行
python multiprocess using map, but with one sub-process running
我是新手 python map()
函数实现并行代码
def main_function(sample):
# ......(only input file; calculations; and output file)
if __name__ == "__main__":
list_sample_common = os.listdir('/lustre/scratch/Stat/s1155136154/ONT_Panel2')# WES,ONT_panel, Pacibo_Panel intersection.
list_sample_Pacibo_normal = ['RMH12', 'RMH15','RMH20','RMH25','RMH3.','RMH7.','RMH9.']# normal people sample
list_sample_ONT_cDNA_only = ['RM66T','RM68T','RM77T']
sample = list_sample_common + list_sample_Pacibo_normal + list_sample_ONT_cDNA_only
pool=Pool()
pool.map(main_function,sample)
pool.close()
pool.join()
所以当我第一次在集群上使用它时,子进程是 运行 500% CPU(因为我在集群中应用了 5 个核心)。
然而,一段时间后,只有一个核心运行ning:
所以,这是因为包含输出和输入操作的主函数?而且由于主进程只将短列表传递给子函数,我相信参数大小不会影响速度。
这只是一个有根据的猜测,因为我对 sample
的大小以及您的工作函数 main_function
正在执行的工作的详细信息知之甚少
假设您要传递给 Pool.map
方法的 iterable、sample
的长度为 70,正如您所说,您的池大小是5. map
方法将 70 个任务分成 chunksize
大小的任务组,将这些块分配给池中的 5 个进程中的每一个。如果您没有为 map
方法指定 chunksize
参数,它会根据可迭代对象的大小 (70) 和池 (5) 的大小计算值,如下所示:
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, pool_size * 4)
if remainder:
chunksize += 1
return chunksize
因此对于您的值,chunksize
将是 4。因此将有 17 个大小为 4 的任务块和一个较小的第 18 个大小为 2 的任务块分布在 5 个进程中(每列是一个队列池中给定进程的任务):
4 4 4 4 4
4 4 4 4 4
4 4 4 4 4
4 4 2
假设所有任务花费相同的时间来处理,您可以看到在一定时间后,最后 2 个进程将完成分配给它们的 12 个任务,现在处于空闲状态,您将 运行宁只有 60%。最终,第三个进程将完成其任务,您现在 运行 将达到 40%。
但是您可以看到 sample
大小和池大小的正确组合,您可能会遇到这样一种情况,即您只会 运行 一个进程。较大的 chunksize
值会加剧这种情况,这些值旨在减少排队任务所需的共享内存访问次数,但可能导致 CPU 利用率低下。
作为一项实验,尝试重新运行您的程序,为您的 map
调用明确指定一个 chunksize
参数 1。除非任务数量是您的池大小的倍数,并且每个任务都需要相同的时间才能完成,否则您不能指望每个处理器都有一个任务要 运行。 事实上,很少会出现这样的情况,即您有一些 其他 而不是只剩下一个进程 运行 完成最后一项任务。 但这应该会减少只有一个处理器忙碌的时间百分比。但是使用 1 的 chunksize
被认为对于大型迭代来说效率低下。
带有 4 个进程池的演示,其中第一个进程获得所有长运行 任务
此处提交了 16 个任务,chunksize
为 4,池大小为 4,因此第一个进程将前 4 个任务提交给 运行,这些任务被人为设置为 10 次比其他人更长 运行ning。我们 return 一个与子流程关联的标识符,以证明一个特定流程正在处理前 4 个任务:
from multiprocessing import Pool, current_process
import re
import time
def get_id():
m = re.search(r'SpawnPoolWorker-(\d+)', str(current_process()))
return int(m[1])
def worker(i):
R = 10000000
id = get_id()
t = time.time()
# run up the cpu:
cnt = 0
for _ in range(R * 10 if i <= 3 else R):
cnt += 1
return i, id, time.time() - t
if __name__ == '__main__':
p = Pool(4)
# 4 tasks per process:
results = p.map(worker, range(16), chunksize=4) # first process gets arguments: 0, 1, 2, 3
for result in results:
i, id, elapsed_time = result
print(f'i={i}, process id={id}, elapsed time={elapsed_time}')
打印:
i=0, process id=1, elapsed time=6.197998046875
i=1, process id=1, elapsed time=5.889002323150635
i=2, process id=1, elapsed time=5.952000856399536
i=3, process id=1, elapsed time=6.022995948791504
i=4, process id=2, elapsed time=0.6909992694854736
i=5, process id=2, elapsed time=0.8339993953704834
i=6, process id=2, elapsed time=0.5869994163513184
i=7, process id=2, elapsed time=0.7560005187988281
i=8, process id=3, elapsed time=0.7500002384185791
i=9, process id=3, elapsed time=0.7440023422241211
i=10, process id=3, elapsed time=0.7600002288818359
i=11, process id=3, elapsed time=0.7479968070983887
i=12, process id=4, elapsed time=0.7950015068054199
i=13, process id=4, elapsed time=0.7909986972808838
i=14, process id=4, elapsed time=0.8639986515045166
i=15, process id=4, elapsed time=0.7230024337768555
重要说明:我可能说过一些事情是对实际发生的事情的简化。有一个任务输入队列。任务以 chunksize
组块的形式放置在此队列中,池中的进程在空闲时将下一个 chunksize
组从队列中取出进行处理。我在我的图表中暗示这些块在一开始就预先分配给所有进程,但情况不一定如此。在我上面的演示中,我选择了一个 chunksize
,它基本上导致所有块被处理(如果未指定,default chunksize
将是 1)。但有时,如果任务的处理很简单(例如,只是一个 return None
语句),第一个进程甚至有可能获取所有块,这在上面的演示中并非如此。具有包含所有块的单个队列的含义是,当 chunksize
为 1 时,处理器永远不应出现不必要的空闲。
我是新手 python map()
函数实现并行代码
def main_function(sample):
# ......(only input file; calculations; and output file)
if __name__ == "__main__":
list_sample_common = os.listdir('/lustre/scratch/Stat/s1155136154/ONT_Panel2')# WES,ONT_panel, Pacibo_Panel intersection.
list_sample_Pacibo_normal = ['RMH12', 'RMH15','RMH20','RMH25','RMH3.','RMH7.','RMH9.']# normal people sample
list_sample_ONT_cDNA_only = ['RM66T','RM68T','RM77T']
sample = list_sample_common + list_sample_Pacibo_normal + list_sample_ONT_cDNA_only
pool=Pool()
pool.map(main_function,sample)
pool.close()
pool.join()
所以当我第一次在集群上使用它时,子进程是 运行 500% CPU(因为我在集群中应用了 5 个核心)。
然而,一段时间后,只有一个核心运行ning:
所以,这是因为包含输出和输入操作的主函数?而且由于主进程只将短列表传递给子函数,我相信参数大小不会影响速度。
这只是一个有根据的猜测,因为我对 sample
的大小以及您的工作函数 main_function
假设您要传递给 Pool.map
方法的 iterable、sample
的长度为 70,正如您所说,您的池大小是5. map
方法将 70 个任务分成 chunksize
大小的任务组,将这些块分配给池中的 5 个进程中的每一个。如果您没有为 map
方法指定 chunksize
参数,它会根据可迭代对象的大小 (70) 和池 (5) 的大小计算值,如下所示:
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, pool_size * 4)
if remainder:
chunksize += 1
return chunksize
因此对于您的值,chunksize
将是 4。因此将有 17 个大小为 4 的任务块和一个较小的第 18 个大小为 2 的任务块分布在 5 个进程中(每列是一个队列池中给定进程的任务):
4 4 4 4 4
4 4 4 4 4
4 4 4 4 4
4 4 2
假设所有任务花费相同的时间来处理,您可以看到在一定时间后,最后 2 个进程将完成分配给它们的 12 个任务,现在处于空闲状态,您将 运行宁只有 60%。最终,第三个进程将完成其任务,您现在 运行 将达到 40%。
但是您可以看到 sample
大小和池大小的正确组合,您可能会遇到这样一种情况,即您只会 运行 一个进程。较大的 chunksize
值会加剧这种情况,这些值旨在减少排队任务所需的共享内存访问次数,但可能导致 CPU 利用率低下。
作为一项实验,尝试重新运行您的程序,为您的 map
调用明确指定一个 chunksize
参数 1。除非任务数量是您的池大小的倍数,并且每个任务都需要相同的时间才能完成,否则您不能指望每个处理器都有一个任务要 运行。 事实上,很少会出现这样的情况,即您有一些 其他 而不是只剩下一个进程 运行 完成最后一项任务。 但这应该会减少只有一个处理器忙碌的时间百分比。但是使用 1 的 chunksize
被认为对于大型迭代来说效率低下。
带有 4 个进程池的演示,其中第一个进程获得所有长运行 任务
此处提交了 16 个任务,chunksize
为 4,池大小为 4,因此第一个进程将前 4 个任务提交给 运行,这些任务被人为设置为 10 次比其他人更长 运行ning。我们 return 一个与子流程关联的标识符,以证明一个特定流程正在处理前 4 个任务:
from multiprocessing import Pool, current_process
import re
import time
def get_id():
m = re.search(r'SpawnPoolWorker-(\d+)', str(current_process()))
return int(m[1])
def worker(i):
R = 10000000
id = get_id()
t = time.time()
# run up the cpu:
cnt = 0
for _ in range(R * 10 if i <= 3 else R):
cnt += 1
return i, id, time.time() - t
if __name__ == '__main__':
p = Pool(4)
# 4 tasks per process:
results = p.map(worker, range(16), chunksize=4) # first process gets arguments: 0, 1, 2, 3
for result in results:
i, id, elapsed_time = result
print(f'i={i}, process id={id}, elapsed time={elapsed_time}')
打印:
i=0, process id=1, elapsed time=6.197998046875
i=1, process id=1, elapsed time=5.889002323150635
i=2, process id=1, elapsed time=5.952000856399536
i=3, process id=1, elapsed time=6.022995948791504
i=4, process id=2, elapsed time=0.6909992694854736
i=5, process id=2, elapsed time=0.8339993953704834
i=6, process id=2, elapsed time=0.5869994163513184
i=7, process id=2, elapsed time=0.7560005187988281
i=8, process id=3, elapsed time=0.7500002384185791
i=9, process id=3, elapsed time=0.7440023422241211
i=10, process id=3, elapsed time=0.7600002288818359
i=11, process id=3, elapsed time=0.7479968070983887
i=12, process id=4, elapsed time=0.7950015068054199
i=13, process id=4, elapsed time=0.7909986972808838
i=14, process id=4, elapsed time=0.8639986515045166
i=15, process id=4, elapsed time=0.7230024337768555
重要说明:我可能说过一些事情是对实际发生的事情的简化。有一个任务输入队列。任务以 chunksize
组块的形式放置在此队列中,池中的进程在空闲时将下一个 chunksize
组从队列中取出进行处理。我在我的图表中暗示这些块在一开始就预先分配给所有进程,但情况不一定如此。在我上面的演示中,我选择了一个 chunksize
,它基本上导致所有块被处理(如果未指定,default chunksize
将是 1)。但有时,如果任务的处理很简单(例如,只是一个 return None
语句),第一个进程甚至有可能获取所有块,这在上面的演示中并非如此。具有包含所有块的单个队列的含义是,当 chunksize
为 1 时,处理器永远不应出现不必要的空闲。