与 multiprocessing.Pool 共享一个计数器
Sharing a counter with multiprocessing.Pool
我想使用 multiprocessing.Value
+ multiprocessing.Lock
在不同的进程之间共享一个计数器。例如:
import itertools as it
import multiprocessing
def func(x, val, lock):
for i in range(x):
i ** 2
with lock:
val.value += 1
print('counter incremented to:', val.value)
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
with multiprocessing.Pool() as pool:
pool.starmap(func, ((i, v, lock) for i in range(25)))
print(counter.value())
这将引发以下异常:
RuntimeError: Synchronized objects should only be shared between
processes through inheritance
我最困惑的是一个相关的(虽然不完全相似)模式与 multiprocessing.Process()
:
一起工作
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=func, args=(i, v, lock))
for i in range(25)]
for p in procs: p.start()
for p in procs: p.join()
现在,我认识到这是两个截然不同的事情:
- 第一个示例使用数量等于
cpu_count()
的工作进程,并在它们之间拆分一个可迭代的 range(25)
- 第二个示例创建了 25 个工作进程和任务,每个工作进程和任务都有一个输入
也就是说:我如何以这种方式与 pool.starmap()
(或 pool.map()
)共享实例?
我见过类似的问题here, here, and here,但这些方法似乎并不适合 .map()
/.starmap()
,不管 Value
是否使用 ctypes.c_int
.
我意识到这种方法在技术上是可行的:
def func(x):
for i in range(x):
i ** 2
with lock:
v.value += 1
print('counter incremented to:', v.value)
v = None
lock = None
def set_global_counter_and_lock():
"""Egh ... """
global v, lock
if not any((v, lock)):
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
if __name__ == '__main__':
# Each worker process will call `initializer()` when it starts.
with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
pool.map(func, range(25))
这真的是解决此问题的最佳做法吗?
使用 Pool
时得到的 RuntimeError
是因为池方法的参数在通过(池内部)队列发送到工作进程之前被腌制。
您尝试使用哪种池方法在这里无关紧要。当您只使用 Process
时不会发生这种情况,因为不涉及队列。您可以使用 pickle.dumps(multiprocessing.Value('i', 0))
.
重现错误
你的最后一个代码片段并不像你想象的那样工作。您没有共享一个Value
,您正在为每个子进程重新创建独立的计数器。
如果您使用的是 Unix 并使用默认启动方法 "fork",您只需 not 将共享对象作为参数传递到池中即可-方法。
您的子进程将通过分叉继承全局变量。使用 process-start-methods "spawn"(默认 Windows 和 macOS with Python 3.8+)或 "forkserver",您必须在 Pool
期间使用 initializer
实例化,让子进程继承共享对象。
注意,这里不需要额外的 multiprocessing.Lock
,因为 multiprocessing.Value
默认带有一个内部的,您可以使用。
import os
from multiprocessing import Pool, Value #, set_start_method
def func(x):
for i in range(x):
assert i == i
with cnt.get_lock():
cnt.value += 1
print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')
def init_globals(counter):
global cnt
cnt = counter
if __name__ == '__main__':
# set_start_method('spawn')
cnt = Value('i', 0)
iterable = [10000 for _ in range(10)]
with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
pool.map(func, iterable)
assert cnt.value == 100000
可能还值得注意的是,您不需要在所有情况下都共享 计数器。
如果您只需要跟踪某件事发生的频率,一个选择是在计算期间保留单独的工作人员本地计数器,您在最后总结。
对于在并行计算本身期间不需要同步的频繁计数器更新,这可能会显着提高性能。
我想使用 multiprocessing.Value
+ multiprocessing.Lock
在不同的进程之间共享一个计数器。例如:
import itertools as it
import multiprocessing
def func(x, val, lock):
for i in range(x):
i ** 2
with lock:
val.value += 1
print('counter incremented to:', val.value)
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
with multiprocessing.Pool() as pool:
pool.starmap(func, ((i, v, lock) for i in range(25)))
print(counter.value())
这将引发以下异常:
RuntimeError: Synchronized objects should only be shared between processes through inheritance
我最困惑的是一个相关的(虽然不完全相似)模式与 multiprocessing.Process()
:
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=func, args=(i, v, lock))
for i in range(25)]
for p in procs: p.start()
for p in procs: p.join()
现在,我认识到这是两个截然不同的事情:
- 第一个示例使用数量等于
cpu_count()
的工作进程,并在它们之间拆分一个可迭代的range(25)
- 第二个示例创建了 25 个工作进程和任务,每个工作进程和任务都有一个输入
也就是说:我如何以这种方式与 pool.starmap()
(或 pool.map()
)共享实例?
我见过类似的问题here, here, and here,但这些方法似乎并不适合 .map()
/.starmap()
,不管 Value
是否使用 ctypes.c_int
.
我意识到这种方法在技术上是可行的:
def func(x):
for i in range(x):
i ** 2
with lock:
v.value += 1
print('counter incremented to:', v.value)
v = None
lock = None
def set_global_counter_and_lock():
"""Egh ... """
global v, lock
if not any((v, lock)):
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
if __name__ == '__main__':
# Each worker process will call `initializer()` when it starts.
with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
pool.map(func, range(25))
这真的是解决此问题的最佳做法吗?
使用 Pool
时得到的 RuntimeError
是因为池方法的参数在通过(池内部)队列发送到工作进程之前被腌制。
您尝试使用哪种池方法在这里无关紧要。当您只使用 Process
时不会发生这种情况,因为不涉及队列。您可以使用 pickle.dumps(multiprocessing.Value('i', 0))
.
你的最后一个代码片段并不像你想象的那样工作。您没有共享一个Value
,您正在为每个子进程重新创建独立的计数器。
如果您使用的是 Unix 并使用默认启动方法 "fork",您只需 not 将共享对象作为参数传递到池中即可-方法。
您的子进程将通过分叉继承全局变量。使用 process-start-methods "spawn"(默认 Windows 和 macOS with Python 3.8+)或 "forkserver",您必须在 Pool
期间使用 initializer
实例化,让子进程继承共享对象。
注意,这里不需要额外的 multiprocessing.Lock
,因为 multiprocessing.Value
默认带有一个内部的,您可以使用。
import os
from multiprocessing import Pool, Value #, set_start_method
def func(x):
for i in range(x):
assert i == i
with cnt.get_lock():
cnt.value += 1
print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')
def init_globals(counter):
global cnt
cnt = counter
if __name__ == '__main__':
# set_start_method('spawn')
cnt = Value('i', 0)
iterable = [10000 for _ in range(10)]
with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
pool.map(func, iterable)
assert cnt.value == 100000
可能还值得注意的是,您不需要在所有情况下都共享 计数器。 如果您只需要跟踪某件事发生的频率,一个选择是在计算期间保留单独的工作人员本地计数器,您在最后总结。 对于在并行计算本身期间不需要同步的频繁计数器更新,这可能会显着提高性能。