Python 检测并正确处理多处理管理器关闭
Python detect and properly handle multiprocessing Manager shutdown
当我在多个子进程中使用 Manager().list() 对象时,我收到 IOError: [Errno 32] Broken pipe 异常。我了解经理 class 将:
Manager processes will be shutdown as soon as they are garbage collected or their parent process exits.
https://docs.python.org/2/library/multiprocessing.html#managers
我假设管理器关闭是导致此异常的原因:
Traceback (most recent call last):
File "/opt/django/mdp/bin/mdp.py", line 83, in run
a_tags, iframe_tags = self.get_page_links()
File "/opt/django/mdp/bin/mdp.py", line 135, in get_page_links
if (stuff not in self.work_tracker) and (stuff not in a_tags):
File "<string>", line 2, in __contains__
File "/usr/lib64/python2.7/multiprocessing/managers.py", line 758, in _callmethod
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
但为了完整起见,这是我的主要功能:
if __name__ == "__main__":
manager = Manager()
work_queue = Queue()
work_tracker = manager.list()
work_results = manager.list()
work_queue.put('work fed in by a loop or something')
workers = 4
processes = []
for i in range(workers):
mdp = MDP(work_queue, work_tracker, work_results)
mdp.start()
for proc in processes:
proc.join()
printable_results = {}
for each in work_results:
printable_results[each['stuff']] = each
print json.dumps(printable_results, indent=4)
下面是 "MDP" 过程的更短版本:
class MDP(Process):
def __init__(self, work_queue, work_tracker, work_results):
Process.__init__(self)
self.exit = Event()
self.work_queue = work_queue
self.work_tracker = work_tracker
self.results = work_results
def run(self):
# Main loop
while not self.exit.is_set():
if self.work_queue.empty():
print '[!] %s sees that the work queue is empty' % self.name
self.shutdown()
else:
try:
job = self.work_queue.get(timeout=3)
results = do_something_on_another_thing_with_the_thing(job)
self.results.append(results)
except KeyboardInterrupt:
print '[!] %s got Ctrl-C! Stopping!' % self.name
self.shutdown()
except IOError as ex:
print traceback.format_exc()
pass
except Exception as ex:
print traceback.format_exc()
pass
def shutdown(self):
print "[!] Shutting down %s" % self.name
self.exit.set()
我不希望管理器在一个子进程检测到队列为空并自行关闭时立即关闭。如何让经理列表保持打开足够长的时间以完成?
盯着同一个代码看一天会变成一个彻头彻尾的白痴,这对我来说总是很滑稽。一时兴起,咨询了一个同事,同事提到如果进程列表为空,进程如何加入?
这个:
for i in range(workers):
mdp = MDP(work_queue, work_tracker, work_results)
mdp.start()
实际上应该是这样的:
for i in range(workers):
mdp = MDP(work_queue, work_tracker, work_results)
mdp.start()
processes.append(mdp) # HURRRRRRRRRRRR
我不再收到管道损坏错误。管道保持完好。所有的。一件。
当我在多个子进程中使用 Manager().list() 对象时,我收到 IOError: [Errno 32] Broken pipe 异常。我了解经理 class 将:
Manager processes will be shutdown as soon as they are garbage collected or their parent process exits.
https://docs.python.org/2/library/multiprocessing.html#managers
我假设管理器关闭是导致此异常的原因:
Traceback (most recent call last):
File "/opt/django/mdp/bin/mdp.py", line 83, in run
a_tags, iframe_tags = self.get_page_links()
File "/opt/django/mdp/bin/mdp.py", line 135, in get_page_links
if (stuff not in self.work_tracker) and (stuff not in a_tags):
File "<string>", line 2, in __contains__
File "/usr/lib64/python2.7/multiprocessing/managers.py", line 758, in _callmethod
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
但为了完整起见,这是我的主要功能:
if __name__ == "__main__":
manager = Manager()
work_queue = Queue()
work_tracker = manager.list()
work_results = manager.list()
work_queue.put('work fed in by a loop or something')
workers = 4
processes = []
for i in range(workers):
mdp = MDP(work_queue, work_tracker, work_results)
mdp.start()
for proc in processes:
proc.join()
printable_results = {}
for each in work_results:
printable_results[each['stuff']] = each
print json.dumps(printable_results, indent=4)
下面是 "MDP" 过程的更短版本:
class MDP(Process):
def __init__(self, work_queue, work_tracker, work_results):
Process.__init__(self)
self.exit = Event()
self.work_queue = work_queue
self.work_tracker = work_tracker
self.results = work_results
def run(self):
# Main loop
while not self.exit.is_set():
if self.work_queue.empty():
print '[!] %s sees that the work queue is empty' % self.name
self.shutdown()
else:
try:
job = self.work_queue.get(timeout=3)
results = do_something_on_another_thing_with_the_thing(job)
self.results.append(results)
except KeyboardInterrupt:
print '[!] %s got Ctrl-C! Stopping!' % self.name
self.shutdown()
except IOError as ex:
print traceback.format_exc()
pass
except Exception as ex:
print traceback.format_exc()
pass
def shutdown(self):
print "[!] Shutting down %s" % self.name
self.exit.set()
我不希望管理器在一个子进程检测到队列为空并自行关闭时立即关闭。如何让经理列表保持打开足够长的时间以完成?
盯着同一个代码看一天会变成一个彻头彻尾的白痴,这对我来说总是很滑稽。一时兴起,咨询了一个同事,同事提到如果进程列表为空,进程如何加入?
这个:
for i in range(workers):
mdp = MDP(work_queue, work_tracker, work_results)
mdp.start()
实际上应该是这样的:
for i in range(workers):
mdp = MDP(work_queue, work_tracker, work_results)
mdp.start()
processes.append(mdp) # HURRRRRRRRRRRR
我不再收到管道损坏错误。管道保持完好。所有的。一件。