Python multiprocessing.Pool 和参数酸洗
Python multiprocessing.Pool and argument pickling
考虑以下示例:
import multiprocessing as mp
def job(l):
l.append(1)
return l
if __name__ == "__main__":
pool = mp.Pool(1)
my_list = []
out = pool.map(job, [my_list for i in range(5)])
pool.close()
pool.join()
print(out)
调用 pool.map 时,我希望在调用作业后对参数进行 pickle 和 unpickled(因此每次都重新创建)。然而,观察到的输出是
[[1, 1], [1, 1], [1, 1], [1, 1], [1]]
有人可以解释一下这是怎么回事吗?我希望输出
是五个 [1] 或 [[1], [1, 1], ..., [1, 1, 1, 1, 1]] 的列表,两者都不是。
对于不同数量的进程,这会产生不同的结果,这意味着您正在做一些进程不安全的事情;在这种情况下,在(可能)多个进程中对本机列表进行操作。
我不太清楚你想要达到什么目的,但这至少表现一致:
from multiprocessing import Pool, Manager
def job(l):
l.append(1)
return l
if __name__ == "__main__":
manager = Manager()
for proc_count in range(1, 6):
print(proc_count)
pool = Pool(proc_count)
my_list = manager.list()
out = pool.map(job, [my_list for i in range(5)])
pool.close()
pool.join()
print(list(list(o) for o in out))
如果这不是您想要的,忘记管理器,删除 my_list
并使用 [list() for i in range(5)]
也会导致一致但不同的行为。
pool.map
的 chunksize
参数是造成您混淆的原因。显然,它会选择为您的设置自动设置 chunksize=2,因为您还通过显式设置 chunksize=2
获得观察到的输出。
使用 chunksize=1
你会得到 [[1], [1], [1], [1], [1]]
chunksize=3
你会得到 [[1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1], [1, 1]]
.
如果你用打印扩展你的代码,你可以看看会发生什么:
import multiprocessing as mp
def job(l):
print(f'before append {l}')
l.append(1)
print(f'after append {l}')
return l
if __name__ == "__main__":
pool = mp.Pool(1)
my_list = []
out = pool.map(job, [my_list for _ in range(5)], chunksize=2)
pool.close()
pool.join()
print(out)
这会给你这个输出:
before append []
after append [1]
before append [1]
after append [1, 1]
before append []
after append [1]
before append [1]
after append [1, 1]
before append []
after append [1]
[[1, 1], [1, 1], [1, 1], [1, 1], [1]]
Process finished with exit code 0
您可以看到,"before append" 仅从空列表开始三次,而不是您预期的五次。那是因为 chunksize=2
和迭代中的五个项目你有 5 / 2 = 2.5 个任务。一半的任务是不可能的,所以这就是为什么你最终有 3 个任务:2 个任务有两个项目块,一个任务有一个项目块。
现在对于前两个任务,您的函数的第一次执行 job
获取未经处理的空列表并附加 1
。然后第二次执行获得与第一次执行刚刚修改的相同列表,因为您的项目只是对该任务中相同列表的引用。第二次执行也会改变第一次执行的结果,因为两者修改的是同一个底层对象。第二次执行后,任务完成,两次执行的结果 [[1, 1], [1, 1]] 被发送回父级。正如我们所说,这发生在前两个任务中。
第三个任务只执行了一次 job
并且它的结果没有被第二个任务修改所以结果只有 [1].
如果您在代码末尾添加 for obj in out: print(id(obj))
,您将看到,结果中的三个单独列表获得了三个不同的 ID,已经构建了尽可能多的任务来处理您的可迭代对象(CPython ).:
140584841382600
140584841382600
140584841383432
140584841383432
140584841383368
考虑以下示例:
import multiprocessing as mp
def job(l):
l.append(1)
return l
if __name__ == "__main__":
pool = mp.Pool(1)
my_list = []
out = pool.map(job, [my_list for i in range(5)])
pool.close()
pool.join()
print(out)
调用 pool.map 时,我希望在调用作业后对参数进行 pickle 和 unpickled(因此每次都重新创建)。然而,观察到的输出是
[[1, 1], [1, 1], [1, 1], [1, 1], [1]]
有人可以解释一下这是怎么回事吗?我希望输出 是五个 [1] 或 [[1], [1, 1], ..., [1, 1, 1, 1, 1]] 的列表,两者都不是。
对于不同数量的进程,这会产生不同的结果,这意味着您正在做一些进程不安全的事情;在这种情况下,在(可能)多个进程中对本机列表进行操作。
我不太清楚你想要达到什么目的,但这至少表现一致:
from multiprocessing import Pool, Manager
def job(l):
l.append(1)
return l
if __name__ == "__main__":
manager = Manager()
for proc_count in range(1, 6):
print(proc_count)
pool = Pool(proc_count)
my_list = manager.list()
out = pool.map(job, [my_list for i in range(5)])
pool.close()
pool.join()
print(list(list(o) for o in out))
如果这不是您想要的,忘记管理器,删除 my_list
并使用 [list() for i in range(5)]
也会导致一致但不同的行为。
pool.map
的 chunksize
参数是造成您混淆的原因。显然,它会选择为您的设置自动设置 chunksize=2,因为您还通过显式设置 chunksize=2
获得观察到的输出。
使用 chunksize=1
你会得到 [[1], [1], [1], [1], [1]]
chunksize=3
你会得到 [[1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1], [1, 1]]
.
如果你用打印扩展你的代码,你可以看看会发生什么:
import multiprocessing as mp
def job(l):
print(f'before append {l}')
l.append(1)
print(f'after append {l}')
return l
if __name__ == "__main__":
pool = mp.Pool(1)
my_list = []
out = pool.map(job, [my_list for _ in range(5)], chunksize=2)
pool.close()
pool.join()
print(out)
这会给你这个输出:
before append []
after append [1]
before append [1]
after append [1, 1]
before append []
after append [1]
before append [1]
after append [1, 1]
before append []
after append [1]
[[1, 1], [1, 1], [1, 1], [1, 1], [1]]
Process finished with exit code 0
您可以看到,"before append" 仅从空列表开始三次,而不是您预期的五次。那是因为 chunksize=2
和迭代中的五个项目你有 5 / 2 = 2.5 个任务。一半的任务是不可能的,所以这就是为什么你最终有 3 个任务:2 个任务有两个项目块,一个任务有一个项目块。
现在对于前两个任务,您的函数的第一次执行 job
获取未经处理的空列表并附加 1
。然后第二次执行获得与第一次执行刚刚修改的相同列表,因为您的项目只是对该任务中相同列表的引用。第二次执行也会改变第一次执行的结果,因为两者修改的是同一个底层对象。第二次执行后,任务完成,两次执行的结果 [[1, 1], [1, 1]] 被发送回父级。正如我们所说,这发生在前两个任务中。
第三个任务只执行了一次 job
并且它的结果没有被第二个任务修改所以结果只有 [1].
如果您在代码末尾添加 for obj in out: print(id(obj))
,您将看到,结果中的三个单独列表获得了三个不同的 ID,已经构建了尽可能多的任务来处理您的可迭代对象(CPython ).:
140584841382600
140584841382600
140584841383432
140584841383432
140584841383368