Python 多处理进程之间共享数据的问题
Issue with sharing data between Python processes with multiprocessing
我看过几篇关于此的帖子,所以我知道这很简单,但我似乎做空了。我不确定我是否需要创建一个工作池,或者使用队列 class。基本上,我希望能够创建多个进程,每个进程都自主运行(这就是它们从 Agent superclass 继承的原因)。
在主循环的随机滴答声中,我想更新每个代理。我在主循环和代理的 运行 循环中使用具有不同值的 time.sleep
来模拟不同的处理器速度。
这是我的超级特工class:
# Generic class to handle mpc of each agent
class Agent(mpc.Process):
# initialize agent parameters
def __init__(self,):
# init mpc
mpc.Process.__init__(self)
self.exit = mpc.Event()
# an agent's main loop...generally should be overridden
def run(self):
while not self.exit.is_set():
pass
print "You exited!"
# safely shutdown an agent
def shutdown(self):
print "Shutdown initiated"
self.exit.set()
# safely communicate values to this agent
def communicate(self,value):
print value
特定代理的子class(模拟 HVAC 系统):
class HVAC(Agent):
def __init__(self, dt=70, dh=50.0):
super(Agent, self).__init__()
self.exit = mpc.Event()
self.__pref_heating = True
self.__pref_cooling = True
self.__desired_temperature = dt
self.__desired_humidity = dh
self.__meas_temperature = 0
self.__meas_humidity = 0.0
self.__hvac_status = "" # heating, cooling, off
self.start()
def run(self): # handle AC or heater on
while not self.exit.is_set():
ctemp = self.measureTemp()
chum = self.measureHumidity()
if (ctemp < self.__desired_temperature):
self.__hvac_status = 'heating'
self.__meas_temperature += 1
elif (ctemp > self.__desired_temperature):
self.__hvac_status = 'cooling'
self.__meas_temperature += 1
else:
self.__hvac_status = 'off'
print self.__hvac_status, self.__meas_temperature
time.sleep(0.5)
print "HVAC EXITED"
def measureTemp(self):
return self.__meas_temperature
def measureHumidity(self):
return self.__meas_humidity
def communicate(self,updates):
self.__meas_temperature = updates['temp']
self.__meas_humidity = updates['humidity']
print "Measured [%d] [%f]" % (self.__meas_temperature,self.__meas_humidity)
我的主循环:
if __name__ == "__main__":
print "Initializing subsystems"
agents = {}
agents['HVAC'] = HVAC()
# Run simulation
timestep = 0
while timestep < args.timesteps:
print "Timestep %d" % timestep
if timestep % 10 == 0:
curr_temp = random.randrange(68,72)
curr_humidity = random.uniform(40.0,60.0)
agents['HVAC'].communicate({'temp':curr_temp, 'humidity':curr_humidity})
time.sleep(1)
timestep += 1
agents['HVAC'].shutdown()
print "HVAC process state: %d" % agents['HVAC'].is_alive()
所以问题是,每当我在主循环中 运行 agents['HVAC'].communicate(x)
时,我都可以看到值被传递到 HVAC
subclass run
循环(因此它会正确打印接收到的值)。但是,该值从未成功存储。
所以典型的输出如下所示:
Initializing subsystems
Timestep 0
Measured [68] [56.948675]
heating 1
heating 2
Timestep 1
heating 3
heating 4
Timestep 2
heating 5
heating 6
实际上,一旦 Measured [68] 出现,内部存储值应更新为输出 68(不是加热 1、加热 2 等)。如此有效,HVAC 的 self.__meas_temperature 没有得到正确更新。
编辑:经过一些研究,我意识到我不一定了解幕后发生的事情。每个子进程都使用自己的虚拟内存块进行操作,并且完全从以这种方式共享的任何数据中抽象出来,因此传递值是行不通的。我的新问题是我不一定确定如何与多个进程共享全局值。
我正在查看 Queue 或 JoinableQueue 包,但我不确定如何将 Queue 传递到我拥有的 superclass 设置类型中(尤其是 mpc.Process.__init__(self)
打电话)。
一个附带的问题是我是否可以让多个代理从队列中读取值而不将其从队列中拉出?例如,如果我想与多个代理共享一个 temperature
值,队列是否适用于此?
Pipe v Queue
假设您需要以下内容,这是一个建议的解决方案:
- 控制工人生命周期的集中管理器/主进程
- 工作进程做一些自包含的事情,然后将结果报告给经理和其他进程
不过,在我展示它之前,为了记录,我想说的是,一般来说,除非你是 CPU 绑定 multiprocessing
并不是真正合适的人选,主要是因为增加了复杂性,并且您可能最好使用不同的高级异步框架。还有,你应该用python 3,这样好多了!
也就是说,multiprocessing.Manager
,使用 multiprocessing
可以很容易地做到这一点。我在 python 3 中完成了此操作,但我认为在 python 2 中没有任何东西不应该 "just work",但我没有检查过。
from ctypes import c_bool
from multiprocessing import Manager, Process, Array, Value
from pprint import pprint
from time import sleep, time
class Agent(Process):
def __init__(self, name, shared_dictionary, delay=0.5):
"""My take on your Agent.
Key difference is that I've commonized the run-loop and used
a shared value to signal when to stop, to demonstrate it.
"""
super(Agent, self).__init__()
self.name = name
# This is going to be how we communicate between processes.
self.shared_dictionary = shared_dictionary
# Create a silo for us to use.
shared_dictionary[name] = []
self.should_stop = Value(c_bool, False)
# Primarily for testing purposes, and for simulating
# slower agents.
self.delay = delay
def get_next_results(self):
# In the real world I'd use abc.ABCMeta as the metaclass to do
# this properly.
raise RuntimeError('Subclasses must implement this')
def run(self):
ii = 0
while not self.should_stop.value:
ii += 1
# debugging / monitoring
print('%s %s run loop execution %d' % (
type(self).__name__, self.name, ii))
next_results = self.get_next_results()
# Add the results, along with a timestamp.
self.shared_dictionary[self.name] += [(time(), next_results)]
sleep(self.delay)
def stop(self):
self.should_stop.value = True
print('%s %s stopped' % (type(self).__name__, self.name))
class HVACAgent(Agent):
def get_next_results(self):
# This is where you do your work, but for the sake of
# the example just return a constant dictionary.
return {'temperature': 5, 'pressure': 7, 'humidity': 9}
class DumbReadingAgent(Agent):
"""A dumb agent to demonstrate workers reading other worker values."""
def get_next_results(self):
# get hvac 1 results:
hvac1_results = self.shared_dictionary.get('hvac 1')
if hvac1_results is None:
return None
return hvac1_results[-1][1]['temperature']
# Script starts.
results = {}
# The "with" ensures we terminate the manager at the end.
with Manager() as manager:
# the manager is a subprocess in its own right. We can ask
# it to manage a dictionary (or other python types) for us
# to be shared among the other children.
shared_info = manager.dict()
hvac_agent1 = HVACAgent('hvac 1', shared_info)
hvac_agent2 = HVACAgent('hvac 2', shared_info, delay=0.1)
dumb_agent = DumbReadingAgent('dumb hvac1 reader', shared_info)
agents = (hvac_agent1, hvac_agent2, dumb_agent)
list(map(lambda a: a.start(), agents))
sleep(1)
list(map(lambda a: a.stop(), agents))
list(map(lambda a: a.join(), agents))
# Not quite sure what happens to the shared dictionary after
# the manager dies, so for safety make a local copy.
results = dict(shared_info)
pprint(results)
我看过几篇关于此的帖子,所以我知道这很简单,但我似乎做空了。我不确定我是否需要创建一个工作池,或者使用队列 class。基本上,我希望能够创建多个进程,每个进程都自主运行(这就是它们从 Agent superclass 继承的原因)。
在主循环的随机滴答声中,我想更新每个代理。我在主循环和代理的 运行 循环中使用具有不同值的 time.sleep
来模拟不同的处理器速度。
这是我的超级特工class:
# Generic class to handle mpc of each agent
class Agent(mpc.Process):
# initialize agent parameters
def __init__(self,):
# init mpc
mpc.Process.__init__(self)
self.exit = mpc.Event()
# an agent's main loop...generally should be overridden
def run(self):
while not self.exit.is_set():
pass
print "You exited!"
# safely shutdown an agent
def shutdown(self):
print "Shutdown initiated"
self.exit.set()
# safely communicate values to this agent
def communicate(self,value):
print value
特定代理的子class(模拟 HVAC 系统):
class HVAC(Agent):
def __init__(self, dt=70, dh=50.0):
super(Agent, self).__init__()
self.exit = mpc.Event()
self.__pref_heating = True
self.__pref_cooling = True
self.__desired_temperature = dt
self.__desired_humidity = dh
self.__meas_temperature = 0
self.__meas_humidity = 0.0
self.__hvac_status = "" # heating, cooling, off
self.start()
def run(self): # handle AC or heater on
while not self.exit.is_set():
ctemp = self.measureTemp()
chum = self.measureHumidity()
if (ctemp < self.__desired_temperature):
self.__hvac_status = 'heating'
self.__meas_temperature += 1
elif (ctemp > self.__desired_temperature):
self.__hvac_status = 'cooling'
self.__meas_temperature += 1
else:
self.__hvac_status = 'off'
print self.__hvac_status, self.__meas_temperature
time.sleep(0.5)
print "HVAC EXITED"
def measureTemp(self):
return self.__meas_temperature
def measureHumidity(self):
return self.__meas_humidity
def communicate(self,updates):
self.__meas_temperature = updates['temp']
self.__meas_humidity = updates['humidity']
print "Measured [%d] [%f]" % (self.__meas_temperature,self.__meas_humidity)
我的主循环:
if __name__ == "__main__":
print "Initializing subsystems"
agents = {}
agents['HVAC'] = HVAC()
# Run simulation
timestep = 0
while timestep < args.timesteps:
print "Timestep %d" % timestep
if timestep % 10 == 0:
curr_temp = random.randrange(68,72)
curr_humidity = random.uniform(40.0,60.0)
agents['HVAC'].communicate({'temp':curr_temp, 'humidity':curr_humidity})
time.sleep(1)
timestep += 1
agents['HVAC'].shutdown()
print "HVAC process state: %d" % agents['HVAC'].is_alive()
所以问题是,每当我在主循环中 运行 agents['HVAC'].communicate(x)
时,我都可以看到值被传递到 HVAC
subclass run
循环(因此它会正确打印接收到的值)。但是,该值从未成功存储。
所以典型的输出如下所示:
Initializing subsystems
Timestep 0
Measured [68] [56.948675]
heating 1
heating 2
Timestep 1
heating 3
heating 4
Timestep 2
heating 5
heating 6
实际上,一旦 Measured [68] 出现,内部存储值应更新为输出 68(不是加热 1、加热 2 等)。如此有效,HVAC 的 self.__meas_temperature 没有得到正确更新。
编辑:经过一些研究,我意识到我不一定了解幕后发生的事情。每个子进程都使用自己的虚拟内存块进行操作,并且完全从以这种方式共享的任何数据中抽象出来,因此传递值是行不通的。我的新问题是我不一定确定如何与多个进程共享全局值。
我正在查看 Queue 或 JoinableQueue 包,但我不确定如何将 Queue 传递到我拥有的 superclass 设置类型中(尤其是 mpc.Process.__init__(self)
打电话)。
一个附带的问题是我是否可以让多个代理从队列中读取值而不将其从队列中拉出?例如,如果我想与多个代理共享一个 temperature
值,队列是否适用于此?
Pipe v Queue
假设您需要以下内容,这是一个建议的解决方案:
- 控制工人生命周期的集中管理器/主进程
- 工作进程做一些自包含的事情,然后将结果报告给经理和其他进程
不过,在我展示它之前,为了记录,我想说的是,一般来说,除非你是 CPU 绑定 multiprocessing
并不是真正合适的人选,主要是因为增加了复杂性,并且您可能最好使用不同的高级异步框架。还有,你应该用python 3,这样好多了!
也就是说,multiprocessing.Manager
,使用 multiprocessing
可以很容易地做到这一点。我在 python 3 中完成了此操作,但我认为在 python 2 中没有任何东西不应该 "just work",但我没有检查过。
from ctypes import c_bool
from multiprocessing import Manager, Process, Array, Value
from pprint import pprint
from time import sleep, time
class Agent(Process):
def __init__(self, name, shared_dictionary, delay=0.5):
"""My take on your Agent.
Key difference is that I've commonized the run-loop and used
a shared value to signal when to stop, to demonstrate it.
"""
super(Agent, self).__init__()
self.name = name
# This is going to be how we communicate between processes.
self.shared_dictionary = shared_dictionary
# Create a silo for us to use.
shared_dictionary[name] = []
self.should_stop = Value(c_bool, False)
# Primarily for testing purposes, and for simulating
# slower agents.
self.delay = delay
def get_next_results(self):
# In the real world I'd use abc.ABCMeta as the metaclass to do
# this properly.
raise RuntimeError('Subclasses must implement this')
def run(self):
ii = 0
while not self.should_stop.value:
ii += 1
# debugging / monitoring
print('%s %s run loop execution %d' % (
type(self).__name__, self.name, ii))
next_results = self.get_next_results()
# Add the results, along with a timestamp.
self.shared_dictionary[self.name] += [(time(), next_results)]
sleep(self.delay)
def stop(self):
self.should_stop.value = True
print('%s %s stopped' % (type(self).__name__, self.name))
class HVACAgent(Agent):
def get_next_results(self):
# This is where you do your work, but for the sake of
# the example just return a constant dictionary.
return {'temperature': 5, 'pressure': 7, 'humidity': 9}
class DumbReadingAgent(Agent):
"""A dumb agent to demonstrate workers reading other worker values."""
def get_next_results(self):
# get hvac 1 results:
hvac1_results = self.shared_dictionary.get('hvac 1')
if hvac1_results is None:
return None
return hvac1_results[-1][1]['temperature']
# Script starts.
results = {}
# The "with" ensures we terminate the manager at the end.
with Manager() as manager:
# the manager is a subprocess in its own right. We can ask
# it to manage a dictionary (or other python types) for us
# to be shared among the other children.
shared_info = manager.dict()
hvac_agent1 = HVACAgent('hvac 1', shared_info)
hvac_agent2 = HVACAgent('hvac 2', shared_info, delay=0.1)
dumb_agent = DumbReadingAgent('dumb hvac1 reader', shared_info)
agents = (hvac_agent1, hvac_agent2, dumb_agent)
list(map(lambda a: a.start(), agents))
sleep(1)
list(map(lambda a: a.stop(), agents))
list(map(lambda a: a.join(), agents))
# Not quite sure what happens to the shared dictionary after
# the manager dies, so for safety make a local copy.
results = dict(shared_info)
pprint(results)