Python 多处理拒绝拆分我的可迭代对象
Python multiprocessing refuses to split up my iterable
我正在尝试编写一个多进程程序,看起来我已经完成了,并且我已经使用系统监视器应用程序验证了 Python 进程已创建。但问题是,似乎几乎所有这些都没有在现实中使用。在我的程序中,我试图将音频文件分成块,所以我不认为它是一个“微不足道的计算负载”,正如我在其他线程中读到的那样。
为我展示相同行为的最小示例:
import os, random, time
from tqdm import tqdm
from multiprocessing import Pool
def myfunc(myli):
print(len(myli))
for item in myli:
x = item*item*item
time.sleep(2)
return
mylist = [random.randint(1,10000) for _ in range (0, 19999)]
with Pool(processes=8) as p, tqdm(total=len(mylist)) as pbar:
for _ in p.imap_unordered(func=myfunc, iterable=(mylist,)):
pbar.update()
如你所见,我在使用的func
里面加了一个print()
,每次打印整个数组的长度。好像没有发生分裂。
我曾经天真地尝试过使用不同的块大小并删除 tqdm(好像它发挥了任何作用)。
如果你能给我任何见解,我将不胜感激。
代码正在执行您告诉它要做的事情:您传递了一个长度为 1 的可迭代对象,一个包含单个项目的元组 (mylist
)。因此它将单个项目传递给单个工作人员进行处理。
但是你不能用 iterable=mylist
代替,因为 myfunc()
期望得到一个序列,而不是一个整数。无论可迭代对象是什么,multiprocessing
一次将一个元素传递给工作人员。 chunksize
与此无关。无论 chunksize
是 1 还是十亿,辅助函数一次只看到一个元素。 chunksize
是一项幕后优化,纯粹是为了减少所需的昂贵的进程间通信调用次数。
如果您想将序列拆分为块并使用需要块的工作函数,那么您必须自己进行“分块”。例如,添加
# Generate slices of `xs` of length (at most) `n`.
def chunkit(xs, n):
start = 0
while start < len(xs):
yield xs[start : start + n]
start += n
并通过iterable=chunkit(mylist, 40)
。那么所有8个进程都会很忙。一个人将在 mylist[0:40]
上工作,另一个人将在 mylist[40:80]
上工作,另一个人将在 mylist[80:120]
上工作,依此类推,直到 mylist
耗尽。
我正在尝试编写一个多进程程序,看起来我已经完成了,并且我已经使用系统监视器应用程序验证了 Python 进程已创建。但问题是,似乎几乎所有这些都没有在现实中使用。在我的程序中,我试图将音频文件分成块,所以我不认为它是一个“微不足道的计算负载”,正如我在其他线程中读到的那样。
为我展示相同行为的最小示例:
import os, random, time
from tqdm import tqdm
from multiprocessing import Pool
def myfunc(myli):
print(len(myli))
for item in myli:
x = item*item*item
time.sleep(2)
return
mylist = [random.randint(1,10000) for _ in range (0, 19999)]
with Pool(processes=8) as p, tqdm(total=len(mylist)) as pbar:
for _ in p.imap_unordered(func=myfunc, iterable=(mylist,)):
pbar.update()
如你所见,我在使用的func
里面加了一个print()
,每次打印整个数组的长度。好像没有发生分裂。
我曾经天真地尝试过使用不同的块大小并删除 tqdm(好像它发挥了任何作用)。
如果你能给我任何见解,我将不胜感激。
代码正在执行您告诉它要做的事情:您传递了一个长度为 1 的可迭代对象,一个包含单个项目的元组 (mylist
)。因此它将单个项目传递给单个工作人员进行处理。
但是你不能用 iterable=mylist
代替,因为 myfunc()
期望得到一个序列,而不是一个整数。无论可迭代对象是什么,multiprocessing
一次将一个元素传递给工作人员。 chunksize
与此无关。无论 chunksize
是 1 还是十亿,辅助函数一次只看到一个元素。 chunksize
是一项幕后优化,纯粹是为了减少所需的昂贵的进程间通信调用次数。
如果您想将序列拆分为块并使用需要块的工作函数,那么您必须自己进行“分块”。例如,添加
# Generate slices of `xs` of length (at most) `n`.
def chunkit(xs, n):
start = 0
while start < len(xs):
yield xs[start : start + n]
start += n
并通过iterable=chunkit(mylist, 40)
。那么所有8个进程都会很忙。一个人将在 mylist[0:40]
上工作,另一个人将在 mylist[40:80]
上工作,另一个人将在 mylist[80:120]
上工作,依此类推,直到 mylist
耗尽。