Python 的多处理 returns 结果比给定的任务多
Python's multiprocessing returns more results than tasks where given
我目前正在尝试为我的模拟使用多处理 运行,以同时评估不同的输入值。
因此,我在过去几周用谷歌搜索了很多东西,找到了一些可能不是很漂亮但(不知何故)有用的东西。我现在的问题是,它 returns 的输出比我给它的任务要多,我不明白为什么。
有时每个模拟 运行 returns 只有一个预期的值,但在下面的示例中,我希望得到例如模拟运行5只[23]。它也有所不同,模拟 运行 产生比预期更多的输出。当我将周期数增加到例如2,它会产生4个输出值,但我不明白为什么会这样。
有人可以提示我如何更改它吗?我找不到答案,我感到很沮丧:(
另外,如果我对 python 很陌生,并且我很喜欢它,所以任何关于如何改进我的代码的建议都将不胜感激:)
这是我使用的简化代码:
import numpy as np
from multiprocessing import Process, Queue
import multiprocessing
from itertools import repeat
class Simulation(Process):
Nr = 1
Mean = 5
StdDev = 3
Periods = 10
Result = []
def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue
def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result
def worker(Mean, stdDev, Periods, Nr, queue):
Sim = Simulation()
Sim.Nr = Nr
Sim.Periods = Periods
Sim.Mean = Mean
Sim.StdDev = stdDev
Results = Sim.runSimulation()
queue.put(Results)
print("Simulation run " + str(Nr) + " done with a result of " + str(Results)
+ " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")")
if __name__ == '__main__':
m = multiprocessing.Manager()
queue = m.Queue()
CPUS = multiprocessing.cpu_count() # CPUS = 8
WORKERS = multiprocessing.Pool(processes=CPUS)
Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 1
Nr = list(range(1,len(Mean) + 1))
WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
WORKERS.close()
WORKERS.join()
FinalSimulationResults = []
for i in range(len(Mean)):
FinalSimulationResults.append(queue.get())
print(FinalSimulationResults)
这导致例如这个:
Simulation run 1 done with a result of [23] (Input: mean: 50, std. dev.: 10)
Simulation run 2 done with a result of [55] (Input: mean: 60, std. dev.: 10)
Simulation run 3 done with a result of [64] (Input: mean: 70, std. dev.: 10)
Simulation run 5 done with a result of [23, 89] (Input: mean: 90, std. dev.: 10)
Simulation run 4 done with a result of [78] (Input: mean: 80, std. dev.: 10)
[[23], [55], [64], [23, 89], [78]]
现在有效:)。没有我预期的那么快(8 核仅快 2 倍)但对于可能遇到相同问题的每个人,这是我的工作代码:
import numpy as np
from multiprocessing import Process, Queue
import multiprocessing
from itertools import repeat
class Simulation():
def __init__(self, Nr, Mean, Std_dev, Periods):
self.Result = []
self.Nr = Nr
self.Mean = Mean
self.StdDev = Std_dev
self.Periods = Periods
def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue
def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result
def worker(Mean, stdDev, Periods, Nr, queue):
Sim = Simulation(Nr=Nr,Mean=Mean,Std_dev=stdDev,Periods=Periods)
Results = Sim.runSimulation()
queue.put(Results)
print("Simulation run " + str(Nr) + " done with a result of " + str(Results)
+ " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")")
if __name__ == '__main__':
start = time.time()
m = multiprocessing.Manager()
queue = m.Queue()
CPUS = multiprocessing.cpu_count()
WORKERS = multiprocessing.Pool(processes=CPUS)
Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 100
Nr = list(range(1,len(Mean) + 1))
WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
WORKERS.close()
WORKERS.join()
FinalSimulationResults = []
for i in range(len(Mean)):
FinalSimulationResults.append(queue.get())
print(FinalSimulationResults)
将属性分配给 class 的方式使属性成为 class 属性。这样它们就可以在 class 的每个实例之间共享。在您的情况下,这不会立即出现,因为在每个进程中您只有一个 class 实例,并且 class 对象本身不在进程之间共享。现在,如果一个工人足够早地完成它可以得到另一个任务 class 对象将被重用并且 class 属性工作 "as expected".
为了避免这种情况,您应该始终在 __init__
函数中分配实例属性(即实例与实例之间应该不同的属性):
class Simulation(Process):
def __init__(self, nr, mean, std_dev, periods):
self.nr = nr
self.mean = mean
self.std_dev = std_dev
self.periods = periods
self.result = []
def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue
def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result
有关详细信息,请参阅 the documentation
也就是说,我认为您不应该按照您正在使用的方式使用流程 class。 Pool
自动为您处理流程创建,您只需要告诉它做什么。所以重写你的代码:
def task(nr, mean, std_dev, periods, results):
for i in range(periods):
results.append(max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0))
return results
m = multiprocessing.Manager()
queue = m.Queue()
cpu_count = multiprocessing.cpu_count() # CPUS = 8
pool = multiprocessing.Pool(processes=CPUS)
Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 1
Nr = list(range(1,len(Mean) + 1))
pool.starmap(task, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
pool.close()
pool.join()
应该可以工作(未测试)。
我目前正在尝试为我的模拟使用多处理 运行,以同时评估不同的输入值。
因此,我在过去几周用谷歌搜索了很多东西,找到了一些可能不是很漂亮但(不知何故)有用的东西。我现在的问题是,它 returns 的输出比我给它的任务要多,我不明白为什么。
有时每个模拟 运行 returns 只有一个预期的值,但在下面的示例中,我希望得到例如模拟运行5只[23]。它也有所不同,模拟 运行 产生比预期更多的输出。当我将周期数增加到例如2,它会产生4个输出值,但我不明白为什么会这样。
有人可以提示我如何更改它吗?我找不到答案,我感到很沮丧:( 另外,如果我对 python 很陌生,并且我很喜欢它,所以任何关于如何改进我的代码的建议都将不胜感激:)
这是我使用的简化代码:
import numpy as np
from multiprocessing import Process, Queue
import multiprocessing
from itertools import repeat
class Simulation(Process):
Nr = 1
Mean = 5
StdDev = 3
Periods = 10
Result = []
def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue
def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result
def worker(Mean, stdDev, Periods, Nr, queue):
Sim = Simulation()
Sim.Nr = Nr
Sim.Periods = Periods
Sim.Mean = Mean
Sim.StdDev = stdDev
Results = Sim.runSimulation()
queue.put(Results)
print("Simulation run " + str(Nr) + " done with a result of " + str(Results)
+ " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")")
if __name__ == '__main__':
m = multiprocessing.Manager()
queue = m.Queue()
CPUS = multiprocessing.cpu_count() # CPUS = 8
WORKERS = multiprocessing.Pool(processes=CPUS)
Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 1
Nr = list(range(1,len(Mean) + 1))
WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
WORKERS.close()
WORKERS.join()
FinalSimulationResults = []
for i in range(len(Mean)):
FinalSimulationResults.append(queue.get())
print(FinalSimulationResults)
这导致例如这个:
Simulation run 1 done with a result of [23] (Input: mean: 50, std. dev.: 10)
Simulation run 2 done with a result of [55] (Input: mean: 60, std. dev.: 10)
Simulation run 3 done with a result of [64] (Input: mean: 70, std. dev.: 10)
Simulation run 5 done with a result of [23, 89] (Input: mean: 90, std. dev.: 10)
Simulation run 4 done with a result of [78] (Input: mean: 80, std. dev.: 10)
[[23], [55], [64], [23, 89], [78]]
现在有效:)。没有我预期的那么快(8 核仅快 2 倍)但对于可能遇到相同问题的每个人,这是我的工作代码:
import numpy as np
from multiprocessing import Process, Queue
import multiprocessing
from itertools import repeat
class Simulation():
def __init__(self, Nr, Mean, Std_dev, Periods):
self.Result = []
self.Nr = Nr
self.Mean = Mean
self.StdDev = Std_dev
self.Periods = Periods
def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue
def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result
def worker(Mean, stdDev, Periods, Nr, queue):
Sim = Simulation(Nr=Nr,Mean=Mean,Std_dev=stdDev,Periods=Periods)
Results = Sim.runSimulation()
queue.put(Results)
print("Simulation run " + str(Nr) + " done with a result of " + str(Results)
+ " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")")
if __name__ == '__main__':
start = time.time()
m = multiprocessing.Manager()
queue = m.Queue()
CPUS = multiprocessing.cpu_count()
WORKERS = multiprocessing.Pool(processes=CPUS)
Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 100
Nr = list(range(1,len(Mean) + 1))
WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
WORKERS.close()
WORKERS.join()
FinalSimulationResults = []
for i in range(len(Mean)):
FinalSimulationResults.append(queue.get())
print(FinalSimulationResults)
将属性分配给 class 的方式使属性成为 class 属性。这样它们就可以在 class 的每个实例之间共享。在您的情况下,这不会立即出现,因为在每个进程中您只有一个 class 实例,并且 class 对象本身不在进程之间共享。现在,如果一个工人足够早地完成它可以得到另一个任务 class 对象将被重用并且 class 属性工作 "as expected".
为了避免这种情况,您应该始终在 __init__
函数中分配实例属性(即实例与实例之间应该不同的属性):
class Simulation(Process):
def __init__(self, nr, mean, std_dev, periods):
self.nr = nr
self.mean = mean
self.std_dev = std_dev
self.periods = periods
self.result = []
def Generate_Value(self):
GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)
return GeneratedValue
def runSimulation(self):
for i in range(self.Periods):
self.Result.append(self.Generate_Value())
return self.Result
有关详细信息,请参阅 the documentation
也就是说,我认为您不应该按照您正在使用的方式使用流程 class。 Pool
自动为您处理流程创建,您只需要告诉它做什么。所以重写你的代码:
def task(nr, mean, std_dev, periods, results):
for i in range(periods):
results.append(max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0))
return results
m = multiprocessing.Manager()
queue = m.Queue()
cpu_count = multiprocessing.cpu_count() # CPUS = 8
pool = multiprocessing.Pool(processes=CPUS)
Mean = [50, 60, 70, 80, 90]
StdDev = [10, 10, 10, 10, 10]
Periods = 1
Nr = list(range(1,len(Mean) + 1))
pool.starmap(task, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue)))
pool.close()
pool.join()
应该可以工作(未测试)。