使用多处理从前导和尾随迭代列表
Iterate over list from leading and trailing with multiprocessing
我想使用 multiprocessing 迭代具有 2 个函数的列表,一个函数从前导迭代 main_list,从尾随迭代另一个函数,每次迭代样本列表时我都想要这个函数(g
) 将元素放在主列表中,直到其中一个在列表中找到重复项,然后我想终止两个进程和 return 看到的元素。
我希望第一个进程 return :
['a', 'b', 'c', 'd', 'e', 'f']
第二个return:
['l', 'k', 'j', 'i', 'h', 'g']
这是我的代码,return有一个错误:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.list()
# Fn definitions and such
def a(main_path,g,l=[]):
for i in g:
l.append(i)
print 'a'
if i in main_path:
return l
main_path.append(i)
def b(main_path,g,l=[]):
for i in g:
l.append(i)
print 'b'
if i in main_path:
return l
main_path.append(i)
g=['a','b','c','d','e','f','g','h','i','j','k','l']
g2=g[::-1]
p1 = Process(target=a, args=(d,g))
p2 = Process(target=b, args=(d,g2))
p1.start()
p2.start()
这是 Traceback
:
a
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/bluebird/Desktop/persiantext.py", line 17, in a
if i in main_path:
File "<string>", line 2, in __contains__
File "/usr/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
self._connect()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 169, in Client
b
c = SocketClient(address)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 304, in SocketClient
s.connect(address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/bluebird/Desktop/persiantext.py", line 27, in b
if i in main_path:
File "<string>", line 2, in __contains__
File "/usr/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
self._connect()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 169, in Client
c = SocketClient(address)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 304, in SocketClient
s.connect(address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory
请注意,我不知道如何在其中一个进程找到重复元素后终止这两个进程!!
您的代码中还有其他各种问题,但是由于我已经在您的其他问题中解释了这些问题,所以我不会在这里讨论它们。
新问题是您没有join
处理您的child进程。在您的线程版本中,这不是问题,只是因为您的主线程在结束前不小心有一个 "block forever" 。但是在这里,你没有那个,所以主进程到达脚本的末尾,而后台进程仍然是 运行.
发生这种情况时,您的代码将执行的操作并未完全定义。*但基本上,您正在破坏管理器 object,这会关闭管理器服务器,而后台进程仍在使用它,因此他们将在下次尝试访问托管 object.
时引发异常
解决方案是在脚本末尾添加 p1.join()
和 p2.join()
。
但这实际上只会让您回到与线程代码相同的情况(除了最后不会永远阻塞)。您仍然拥有完全序列化的代码和大的竞争条件,等等。
如果你好奇为什么会这样:
在脚本的末尾,您模块的所有全局变量都超出了范围。** 因为这些变量是您对管理器和进程的唯一引用 objects,那些 objects 得到 garbage-collected,它们的析构函数被调用。
对于管理器 object,析构函数关闭服务器。
对于进程object,我不完全确定,但我认为析构函数什么都不做(而不是加入它and/or中断它).相反,有一个 atexit 函数,它在所有析构函数之后运行,加入任何 still-running 进程。***
所以,首先经理离开,然后主进程开始等待 children 完成;下次每个人都尝试访问托管 object 时,它会失败并退出。一旦他们都这样做了,主进程完成等待并退出。
* 3.2 中的 multiprocessing
更改和 3.4 中的关闭更改使事情变得更加清晰,所以如果我们不谈论 2.7,那么 "here's what usually happens but not always" 和 "here's what happens in one particular implementation on one particular platform".
** 2.7 实际上并不能保证这一点,并且 garbage-collecting 所有模块的全局变量并不总是发生。但是在这个特别简单的例子中,我 pretty 确定它 will 总是这样工作,至少在 CPython 中是这样,尽管我不想要试图解释原因。
*** 这绝对是它与线程一起工作的方式,至少在 Unix 上的 CPython 2.7 上是这样……同样,这根本没有记录在 2.x 中,所以你只能告诉通过阅读源代码或在对您重要的 platforms/implementations/versions 上进行实验……而且我不想通过源代码进行跟踪,除非可能会发现一些令人费解或有趣的东西。
我想使用 multiprocessing 迭代具有 2 个函数的列表,一个函数从前导迭代 main_list,从尾随迭代另一个函数,每次迭代样本列表时我都想要这个函数(g
) 将元素放在主列表中,直到其中一个在列表中找到重复项,然后我想终止两个进程和 return 看到的元素。
我希望第一个进程 return :
['a', 'b', 'c', 'd', 'e', 'f']
第二个return:
['l', 'k', 'j', 'i', 'h', 'g']
这是我的代码,return有一个错误:
from multiprocessing import Process, Manager
manager = Manager()
d = manager.list()
# Fn definitions and such
def a(main_path,g,l=[]):
for i in g:
l.append(i)
print 'a'
if i in main_path:
return l
main_path.append(i)
def b(main_path,g,l=[]):
for i in g:
l.append(i)
print 'b'
if i in main_path:
return l
main_path.append(i)
g=['a','b','c','d','e','f','g','h','i','j','k','l']
g2=g[::-1]
p1 = Process(target=a, args=(d,g))
p2 = Process(target=b, args=(d,g2))
p1.start()
p2.start()
这是 Traceback
:
a
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/bluebird/Desktop/persiantext.py", line 17, in a
if i in main_path:
File "<string>", line 2, in __contains__
File "/usr/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
self._connect()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 169, in Client
b
c = SocketClient(address)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 304, in SocketClient
s.connect(address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/bluebird/Desktop/persiantext.py", line 27, in b
if i in main_path:
File "<string>", line 2, in __contains__
File "/usr/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
self._connect()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 169, in Client
c = SocketClient(address)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 304, in SocketClient
s.connect(address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory
请注意,我不知道如何在其中一个进程找到重复元素后终止这两个进程!!
您的代码中还有其他各种问题,但是由于我已经在您的其他问题中解释了这些问题,所以我不会在这里讨论它们。
新问题是您没有join
处理您的child进程。在您的线程版本中,这不是问题,只是因为您的主线程在结束前不小心有一个 "block forever" 。但是在这里,你没有那个,所以主进程到达脚本的末尾,而后台进程仍然是 运行.
发生这种情况时,您的代码将执行的操作并未完全定义。*但基本上,您正在破坏管理器 object,这会关闭管理器服务器,而后台进程仍在使用它,因此他们将在下次尝试访问托管 object.
时引发异常解决方案是在脚本末尾添加 p1.join()
和 p2.join()
。
但这实际上只会让您回到与线程代码相同的情况(除了最后不会永远阻塞)。您仍然拥有完全序列化的代码和大的竞争条件,等等。
如果你好奇为什么会这样:
在脚本的末尾,您模块的所有全局变量都超出了范围。** 因为这些变量是您对管理器和进程的唯一引用 objects,那些 objects 得到 garbage-collected,它们的析构函数被调用。
对于管理器 object,析构函数关闭服务器。
对于进程object,我不完全确定,但我认为析构函数什么都不做(而不是加入它and/or中断它).相反,有一个 atexit 函数,它在所有析构函数之后运行,加入任何 still-running 进程。***
所以,首先经理离开,然后主进程开始等待 children 完成;下次每个人都尝试访问托管 object 时,它会失败并退出。一旦他们都这样做了,主进程完成等待并退出。
* 3.2 中的 multiprocessing
更改和 3.4 中的关闭更改使事情变得更加清晰,所以如果我们不谈论 2.7,那么 "here's what usually happens but not always" 和 "here's what happens in one particular implementation on one particular platform".
** 2.7 实际上并不能保证这一点,并且 garbage-collecting 所有模块的全局变量并不总是发生。但是在这个特别简单的例子中,我 pretty 确定它 will 总是这样工作,至少在 CPython 中是这样,尽管我不想要试图解释原因。
*** 这绝对是它与线程一起工作的方式,至少在 Unix 上的 CPython 2.7 上是这样……同样,这根本没有记录在 2.x 中,所以你只能告诉通过阅读源代码或在对您重要的 platforms/implementations/versions 上进行实验……而且我不想通过源代码进行跟踪,除非可能会发现一些令人费解或有趣的东西。