在 class 中使用多处理模块
Using multiprocessing module in class
我有以下程序,我想使用多处理模块。它使用外部文件,其中我从另一个文件调用 PSO class。 costfunc
是来自另一个文件的函数,其他参数只是变量。
Swarm
是一个包含与 ps
的值一样多的对象的列表,每个对象都有多个属性,需要在每次迭代时更新。
继 Hannu 实施 multiprocessing.pool 之后,它正在运行,但是它比 运行ning 顺序花费更多的时间。
如果您能告诉我发生这种情况的原因是什么以及我怎样才能使它 运行 更快,我将不胜感激?
# IMPORT PACKAGES -----------------------------------------------------------+
import random
import numpy as np
# IMPORT FILES --------------------------------------------------------------+
from Reducer import initial
# Particle Class ------------------------------------------------------------+
class Particle:
def __init__(self,D,bounds_x,bounds_v):
self.Position_i = [] # particle position
self.Velocity_i = [] # particle velocity
self.Cost_i = -1 # cost individual
self.Position_Best_i = [] # best position individual
self.Cost_Best_i = -1 # best cost individual
self.Constraint_Best_i = [] # best cost individual contraints
self.Constraint_i = [] # constraints individual
self.Penalty_i = -1 # constraints individual
x0,v0 = initial(D,bounds_x,bounds_v)
for i in range(0,D):
self.Velocity_i.append(v0[i])
self.Position_i.append(x0[i])
# evaluate current fitness
def evaluate(self,costFunc,i):
self.Cost_i, self.Constraint_i,self.Penalty_i = costFunc(self.Position_i,i)
# check to see if the current position is an individual best
if self.Cost_i < self.Cost_Best_i or self.Cost_Best_i == -1:
self.Position_Best_i = self.Position_i
self.Cost_Best_i = self.Cost_i
self.Constraint_Best_i = self.Constraint_i
self.Penalty_Best_i = self.Penalty_i
return self
def proxy(gg, costf, i):
print(gg.evaluate(costf, i))
# Swarm Class ---------------------------------------------------------------+
class PSO():
def __init__(self,costFunc,bounds_x,bounds_v,ps,D,maxiter):
self.Cost_Best_g = -1 # Best Cost for Group
self.Position_Best_g = [] # Best Position for Group
self.Constraint_Best_g = []
self.Penalty_Best_g = -1
# Establish Swarm
Swarm = []
for i in range(0,ps):
Swarm.append(Particle(D,bounds_x,bounds_v))
# Begin optimization Loop
i = 1
self.Evol = []
while i <= maxiter:
pool = multiprocessing.Pool(processes = 4)
results = pool.map_async(partial(proxy, costf = costFunc, i=i), Swarm)
pool.close()
pool.join()
Swarm = results.get()
if Swarm[j].Cost_i< self.Cost_Best_g or self.Cost_Best_g == -1:
self.Position_Best_g = list(Swarm[j].Position_i)
self.Cost_Best_g = float(Swarm[j].Cost_i)
self.Constraint_Best_g = list(Swarm[j].Constraint_i)
self.Penalty_Best_g = float(Swarm[j].Penalty_i)
self.Evol.append(self.Cost_Best_g)
i += 1
您需要一个代理函数来执行函数调用,并且由于您需要向函数传递参数,因此您还需要 partial
。考虑一下:
from time import sleep
from multiprocessing import Pool
from functools import partial
class Foo:
def __init__(self, a):
self.a = a
self.b = None
def evaluate(self, CostFunction, i):
xyzzy = CostFunction(i)
sleep(0.01)
self.b = self.a*xyzzy
return self
def CostFunc(i):
return i*i
def proxy(gg, costf, i):
return gg.evaluate(costf, i)
def main():
Swarm = []
for i in range(0,10):
nc = Foo(i)
Swarm.append(nc)
p = Pool()
for i in range(100,102):
results = p.map_async(partial(proxy, costf=CostFunc, i=i), Swarm)
p.close()
p.join()
Swarm = []
for a in results.get():
Swarm.append(a)
for s in Swarm:
print (s.b)
main()
这会创建一个 Swarm
对象列表,每个对象中的 evaluate
就是您需要调用的函数。然后我们有参数(CostFunc 和代码中的整数)。
我们现在将使用 Pool.map_async
将您的 Swarm 列表映射到您的池中。这从您的 Swarm 列表中为每个工作人员提供了一个 Foo
的实例,并且我们有一个 proxy
函数实际调用 evaluate()
。
然而,由于apply_async
只将对象从iterable发送到函数,而不是使用proxy
作为目标函数到pool,我们使用partial
创建目标传递 "fixed" 参数的函数。
而且您显然想要取回修改后的对象,这需要另一个技巧。如果在 Pool 进程中修改目标对象,它只是修改本地副本,并在处理完成后立即将其丢弃。无论如何,子进程都无法修改主进程内存(反之亦然),这会导致分段错误。
相反,在修改对象后,我们 return self
。当您的池完成其工作时,我们将丢弃旧的 Swarm
并从结果对象中重新组合它。
我有以下程序,我想使用多处理模块。它使用外部文件,其中我从另一个文件调用 PSO class。 costfunc
是来自另一个文件的函数,其他参数只是变量。
Swarm
是一个包含与 ps
的值一样多的对象的列表,每个对象都有多个属性,需要在每次迭代时更新。
继 Hannu 实施 multiprocessing.pool 之后,它正在运行,但是它比 运行ning 顺序花费更多的时间。
如果您能告诉我发生这种情况的原因是什么以及我怎样才能使它 运行 更快,我将不胜感激?
# IMPORT PACKAGES -----------------------------------------------------------+
import random
import numpy as np
# IMPORT FILES --------------------------------------------------------------+
from Reducer import initial
# Particle Class ------------------------------------------------------------+
class Particle:
def __init__(self,D,bounds_x,bounds_v):
self.Position_i = [] # particle position
self.Velocity_i = [] # particle velocity
self.Cost_i = -1 # cost individual
self.Position_Best_i = [] # best position individual
self.Cost_Best_i = -1 # best cost individual
self.Constraint_Best_i = [] # best cost individual contraints
self.Constraint_i = [] # constraints individual
self.Penalty_i = -1 # constraints individual
x0,v0 = initial(D,bounds_x,bounds_v)
for i in range(0,D):
self.Velocity_i.append(v0[i])
self.Position_i.append(x0[i])
# evaluate current fitness
def evaluate(self,costFunc,i):
self.Cost_i, self.Constraint_i,self.Penalty_i = costFunc(self.Position_i,i)
# check to see if the current position is an individual best
if self.Cost_i < self.Cost_Best_i or self.Cost_Best_i == -1:
self.Position_Best_i = self.Position_i
self.Cost_Best_i = self.Cost_i
self.Constraint_Best_i = self.Constraint_i
self.Penalty_Best_i = self.Penalty_i
return self
def proxy(gg, costf, i):
print(gg.evaluate(costf, i))
# Swarm Class ---------------------------------------------------------------+
class PSO():
def __init__(self,costFunc,bounds_x,bounds_v,ps,D,maxiter):
self.Cost_Best_g = -1 # Best Cost for Group
self.Position_Best_g = [] # Best Position for Group
self.Constraint_Best_g = []
self.Penalty_Best_g = -1
# Establish Swarm
Swarm = []
for i in range(0,ps):
Swarm.append(Particle(D,bounds_x,bounds_v))
# Begin optimization Loop
i = 1
self.Evol = []
while i <= maxiter:
pool = multiprocessing.Pool(processes = 4)
results = pool.map_async(partial(proxy, costf = costFunc, i=i), Swarm)
pool.close()
pool.join()
Swarm = results.get()
if Swarm[j].Cost_i< self.Cost_Best_g or self.Cost_Best_g == -1:
self.Position_Best_g = list(Swarm[j].Position_i)
self.Cost_Best_g = float(Swarm[j].Cost_i)
self.Constraint_Best_g = list(Swarm[j].Constraint_i)
self.Penalty_Best_g = float(Swarm[j].Penalty_i)
self.Evol.append(self.Cost_Best_g)
i += 1
您需要一个代理函数来执行函数调用,并且由于您需要向函数传递参数,因此您还需要 partial
。考虑一下:
from time import sleep
from multiprocessing import Pool
from functools import partial
class Foo:
def __init__(self, a):
self.a = a
self.b = None
def evaluate(self, CostFunction, i):
xyzzy = CostFunction(i)
sleep(0.01)
self.b = self.a*xyzzy
return self
def CostFunc(i):
return i*i
def proxy(gg, costf, i):
return gg.evaluate(costf, i)
def main():
Swarm = []
for i in range(0,10):
nc = Foo(i)
Swarm.append(nc)
p = Pool()
for i in range(100,102):
results = p.map_async(partial(proxy, costf=CostFunc, i=i), Swarm)
p.close()
p.join()
Swarm = []
for a in results.get():
Swarm.append(a)
for s in Swarm:
print (s.b)
main()
这会创建一个 Swarm
对象列表,每个对象中的 evaluate
就是您需要调用的函数。然后我们有参数(CostFunc 和代码中的整数)。
我们现在将使用 Pool.map_async
将您的 Swarm 列表映射到您的池中。这从您的 Swarm 列表中为每个工作人员提供了一个 Foo
的实例,并且我们有一个 proxy
函数实际调用 evaluate()
。
然而,由于apply_async
只将对象从iterable发送到函数,而不是使用proxy
作为目标函数到pool,我们使用partial
创建目标传递 "fixed" 参数的函数。
而且您显然想要取回修改后的对象,这需要另一个技巧。如果在 Pool 进程中修改目标对象,它只是修改本地副本,并在处理完成后立即将其丢弃。无论如何,子进程都无法修改主进程内存(反之亦然),这会导致分段错误。
相反,在修改对象后,我们 return self
。当您的池完成其工作时,我们将丢弃旧的 Swarm
并从结果对象中重新组合它。