Python 多重处理,变量的预初始化
Python Multiprocessing, preinitialization of variables
我正在尝试使用多处理模块并行化我的代码。我正在处理的代码分两步工作。在第一步中,我初始化了一个 class,它计算并保存了几个变量,这些变量将在第二步中使用。在第二步中,程序使用先前初始化的变量执行计算。第一步的变量没有以任何方式修改。第一步的计算时间并不重要,但在第二步中,因为它被调用了几百次,这是必然的顺序。下面是代码结构和 ist 输出的构造最小示例。
import numpy as np
import time
from multiprocessing import Pool
class test:
def __init__(self):
self.r = np.ones(10000000)
def f(self,init):
summed = 0
for i in range(0,init):
summed = summed + i
return summed
if __name__ == "__main__":
# first step
func = test()
# second step
# sequential
start_time = time.time()
for i in [1000000,1000000,1000000,1000000]:
func.f(i)
print('Sequential: ', time.time()-start_time)
# parallel
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(func.f,[[1000000],[1000000],[1000000],[1000000]])
print('Parallel: ', time.time()-start_time)
输出:
顺序:0.2673146724700928
并行:1.5638213157653809
据我所知,多处理变慢了,因为 class 测试的变量 r 必须转移到所有工作进程。为了避免这种情况,我需要在启动 f 之前在每个 worker 上初始化 class。多处理可能吗?还有其他工具可以做到这一点吗?
简单创建函数
def my_function(value):
func = Test()
return func.f(value)
甚至
def my_function(value):
return Test().f(value)
并使用它
result = pool.starmap(my_function, [[1000000],[1000000],[1000000],[1000000]])
多处理不适用于 lambda
,因此您不能使用
pool.starmap(lambda value:Test().f(value), ...)
可能它不起作用 functools.partial()
所以你不能用它代替 lambda
最小工作示例
import numpy as np
import time
from multiprocessing import Pool
class Test: # PEP8: `CamelCaseNames` for classes
def __init__(self):
self.r = np.ones(10000000)
def f(self, init):
summed = 0
for i in range(init):
summed = summed + i
return summed
def my_function(value):
func = Test()
return func.f(value)
if __name__ == "__main__":
data = [[1000000] for x in range(30)]
# first step
func = Test()
# second step
# sequential
start_time = time.time()
for i in data:
func.f(*i) # `*i` like in starmap
print('Sequential:', time.time()-start_time)
# parallel 1
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(func.f, data)
print('Parallel 1:', time.time()-start_time)
# parallel 2
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(my_function, data)
print('Parallel 2:', time.time()-start_time)
我的结果:
Sequential: 3.0593459606170654
Parallel 1: 5.2161490917205810
Parallel 2: 1.8350131511688232
我已经使用多处理模块中的 Pipe 函数解决了这个问题。在第一步中,我现在可以初始化变量并设置多处理环境。然后我使用管道函数来传输输入数据。
对于“self.r = np.ones(100000000)”
平行管道:0.8008558750152588
并行 2:18.51273012161255
对于“self.r = np.ones(10000000)”
平行管道:0.71409010887146
并行 2:1.4551067352294922
import numpy as np
import time
import multiprocessing as mp
class Test: # PEP8: `CamelCaseNames` for classes
def __init__(self):
self.r = np.ones(100000000)
def f(self, init):
summed = 0
for i in range(init):
summed = summed + i
return summed
def my_function(value):
func = Test()
return func.f(value)
class Connection:
def __init__(self):
self.process = {}
self.parent = {}
self.child = {}
def add(self, hub, process, parent_conn, child_conn):
self.process[hub] = process
self.parent[hub] = parent_conn
self.child[hub] = child_conn
def multi_run(child_conn, func, i):
while 1:
init = child_conn.recv()
data = func.f(init)
child_conn.send(data)
if __name__ == "__main__":
N_processes = 4
func = Test()
conn = Connection()
# First step
for i in range(N_processes):
parent_conn, child_conn = mp.Pipe()
process = mp.Process(target=multi_run, args=(child_conn, func, i))
conn.add(i, process, parent_conn, child_conn)
process.start()
start_time = time.time()
data = [[1000000, x] for x in range(30)]
# Second step
for i, j in data:
conn.parent[j % N_processes].send(i)
for i, j in data:
conn.parent[j % N_processes].recv()
print('Parallel piped:', time.time()-start_time)
data = [[1000000] for x in range(30)]
# parallel 2
start_time = time.time()
pool = mp.Pool(processes=None)
result = pool.starmap(my_function, data)
print('Parallel 2:', time.time()-start_time)
我正在尝试使用多处理模块并行化我的代码。我正在处理的代码分两步工作。在第一步中,我初始化了一个 class,它计算并保存了几个变量,这些变量将在第二步中使用。在第二步中,程序使用先前初始化的变量执行计算。第一步的变量没有以任何方式修改。第一步的计算时间并不重要,但在第二步中,因为它被调用了几百次,这是必然的顺序。下面是代码结构和 ist 输出的构造最小示例。
import numpy as np
import time
from multiprocessing import Pool
class test:
def __init__(self):
self.r = np.ones(10000000)
def f(self,init):
summed = 0
for i in range(0,init):
summed = summed + i
return summed
if __name__ == "__main__":
# first step
func = test()
# second step
# sequential
start_time = time.time()
for i in [1000000,1000000,1000000,1000000]:
func.f(i)
print('Sequential: ', time.time()-start_time)
# parallel
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(func.f,[[1000000],[1000000],[1000000],[1000000]])
print('Parallel: ', time.time()-start_time)
输出:
顺序:0.2673146724700928
并行:1.5638213157653809
据我所知,多处理变慢了,因为 class 测试的变量 r 必须转移到所有工作进程。为了避免这种情况,我需要在启动 f 之前在每个 worker 上初始化 class。多处理可能吗?还有其他工具可以做到这一点吗?
简单创建函数
def my_function(value):
func = Test()
return func.f(value)
甚至
def my_function(value):
return Test().f(value)
并使用它
result = pool.starmap(my_function, [[1000000],[1000000],[1000000],[1000000]])
多处理不适用于 lambda
,因此您不能使用
pool.starmap(lambda value:Test().f(value), ...)
可能它不起作用 functools.partial()
所以你不能用它代替 lambda
最小工作示例
import numpy as np
import time
from multiprocessing import Pool
class Test: # PEP8: `CamelCaseNames` for classes
def __init__(self):
self.r = np.ones(10000000)
def f(self, init):
summed = 0
for i in range(init):
summed = summed + i
return summed
def my_function(value):
func = Test()
return func.f(value)
if __name__ == "__main__":
data = [[1000000] for x in range(30)]
# first step
func = Test()
# second step
# sequential
start_time = time.time()
for i in data:
func.f(*i) # `*i` like in starmap
print('Sequential:', time.time()-start_time)
# parallel 1
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(func.f, data)
print('Parallel 1:', time.time()-start_time)
# parallel 2
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(my_function, data)
print('Parallel 2:', time.time()-start_time)
我的结果:
Sequential: 3.0593459606170654
Parallel 1: 5.2161490917205810
Parallel 2: 1.8350131511688232
我已经使用多处理模块中的 Pipe 函数解决了这个问题。在第一步中,我现在可以初始化变量并设置多处理环境。然后我使用管道函数来传输输入数据。
对于“self.r = np.ones(100000000)”
平行管道:0.8008558750152588
并行 2:18.51273012161255
对于“self.r = np.ones(10000000)”
平行管道:0.71409010887146
并行 2:1.4551067352294922
import numpy as np
import time
import multiprocessing as mp
class Test: # PEP8: `CamelCaseNames` for classes
def __init__(self):
self.r = np.ones(100000000)
def f(self, init):
summed = 0
for i in range(init):
summed = summed + i
return summed
def my_function(value):
func = Test()
return func.f(value)
class Connection:
def __init__(self):
self.process = {}
self.parent = {}
self.child = {}
def add(self, hub, process, parent_conn, child_conn):
self.process[hub] = process
self.parent[hub] = parent_conn
self.child[hub] = child_conn
def multi_run(child_conn, func, i):
while 1:
init = child_conn.recv()
data = func.f(init)
child_conn.send(data)
if __name__ == "__main__":
N_processes = 4
func = Test()
conn = Connection()
# First step
for i in range(N_processes):
parent_conn, child_conn = mp.Pipe()
process = mp.Process(target=multi_run, args=(child_conn, func, i))
conn.add(i, process, parent_conn, child_conn)
process.start()
start_time = time.time()
data = [[1000000, x] for x in range(30)]
# Second step
for i, j in data:
conn.parent[j % N_processes].send(i)
for i, j in data:
conn.parent[j % N_processes].recv()
print('Parallel piped:', time.time()-start_time)
data = [[1000000] for x in range(30)]
# parallel 2
start_time = time.time()
pool = mp.Pool(processes=None)
result = pool.starmap(my_function, data)
print('Parallel 2:', time.time()-start_time)