在 Python 中停止线程池中的进程
Stopping processes in ThreadPool in Python
我一直在尝试为控制某些硬件的库编写交互式包装器(用于 ipython)。有些调用在 IO 上很重,因此并行执行任务是有意义的。使用线程池(几乎)效果很好:
from multiprocessing.pool import ThreadPool
class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)
def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result
pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())
如果用户想要停止进程或者 do_other_stuff_for_a_bit()
中出现 IO 错误,问题就会出现。按 Ctrl+C 停止主进程,但工作线程继续 运行ning 直到他们当前的任务完成。
有什么方法可以停止这些线程而不必重写库或让用户退出 python?我在其他示例中看到的 pool.terminate()
和 pool.join()
似乎无法完成这项工作。
实际例程(而不是上面的简化版本)使用日志记录,虽然所有工作线程都在某个时候关闭,但我可以看到它们启动的进程 运行ning 继续进行直到完成(作为硬件,我可以通过观察整个房间看到它们的效果)。
这是在 python 2.7 中。
更新:
解决方案似乎是改用 multiprocessing.Process 而不是线程池。我试过的测试代码是 运行 foo_pulse:
class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name
如果您尝试使用 ThreadPool 运行ning 这个,那么 ctrl-C 不会从 运行ning 停止 foo_pulse(即使它确实会立即终止线程,打印语句继续前进:
from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())
然而,使用 multiprocessing.Process 的版本按预期工作:
import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')
我在这里为库 foo 定义了一个包装器(这样就不需要重写)。如果不需要 return 值,则此包装器也不是:
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
从文档中我看不出池无法工作的原因(除了错误)。
这是对并行性的一个非常有趣的应用。
但是,如果您使用 multiprocessing
,目标是让多个进程 运行 并行,而不是一个进程 运行 多个线程。
考虑使用 multiprocessing
:
来实施这几项更改
您有这些函数将 运行 并行:
import time
import multiprocessing as mp
def some_long_task_from_library(wtime):
time.sleep(wtime)
class MyException(Exception): pass
def do_other_stuff_for_a_bit():
time.sleep(5)
raise MyException("Something Happened...")
让我们创建并启动进程,比如 4:
procs = [] # this is not a Pool, it is just a way to handle the
# processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
p = mp.Process(target=some_long_task_from_library, args=(1000,))
p.start()
procs.append(p)
mp.active_children() # this joins all the started processes, and runs them.
这些进程是 运行 并行的,大概是在一个单独的 cpu 核心中,但这要由 OS 来决定。您可以查看您的系统监视器。
与此同时,您 运行 一个会中断的进程,并且您想停止 运行ning 进程,而不是让它们成为孤儿:
try:
do_other_stuff_for_a_bit()
except MyException as exc:
print(exc)
print("Now stopping all processes...")
for p in procs:
p.terminate()
print("The rest of the process will continue")
如果在一个或所有子进程终止时继续主进程没有意义,您应该处理主程序的退出。
希望对您有所帮助,您可以为您的图书馆改编其中的一些内容。
在回答池为何不起作用的问题时,这是由于(如 Documentation 中引用的)然后 main 需要由子进程,并且由于该项目的性质,正在使用交互式 python。
同时,ThreadPool 为何会出现的原因尚不清楚 - 尽管线索就在名称中。 ThreadPool 使用 multiprocessing.dummy 创建其工作进程池,如前所述,here 只是 Threading 模块的包装器。池使用 multiprocessing.Process。通过这个测试可以看出:
p=ThreadPool(processes=3)
p._pool[0]
<DummyProcess(Thread23, started daemon 12345)> #no terminate() method
p=Pool(processes=3)
p._pool[0]
<Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed
由于线程没有终止方法,因此工作线程会继续 运行 直到它们完成当前任务。杀死线程很麻烦(这就是我尝试使用多处理模块的原因)但解决方案是 here.
关于使用上述解决方案的一个警告:
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
对象实例中的属性更改不会传回主程序。例如上面的 class foo 也可以有如下方法:
def 添加IP(新IP):
self.hardwareIP=新IP
调用 r=mp.Process(target=a.addIP,args=(127.0.0.1))
不会更新 a
。
复杂对象的唯一解决方法似乎是使用自定义 manager
共享内存,它可以访问对象的方法和属性 a
对于非常大的复杂对象基于库,最好使用 dir(foo)
来填充管理器。如果我能弄清楚如何用一个例子来更新这个答案(对于我未来的自己和其他人)。
如果出于某些原因使用线程更可取,我们可以使用 this.
We can send some siginal to the threads we want to terminate. The simplest siginal is global variable:
import time
from multiprocessing.pool import ThreadPool
_FINISH = False
def hang():
while True:
if _FINISH:
break
print 'hanging..'
time.sleep(10)
def main():
global _FINISH
pool = ThreadPool(processes=1)
pool.apply_async(hang)
time.sleep(10)
_FINISH = True
pool.terminate()
pool.join()
print 'main process exiting..'
if __name__ == '__main__':
main()
我一直在尝试为控制某些硬件的库编写交互式包装器(用于 ipython)。有些调用在 IO 上很重,因此并行执行任务是有意义的。使用线程池(几乎)效果很好:
from multiprocessing.pool import ThreadPool
class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)
def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result
pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())
如果用户想要停止进程或者 do_other_stuff_for_a_bit()
中出现 IO 错误,问题就会出现。按 Ctrl+C 停止主进程,但工作线程继续 运行ning 直到他们当前的任务完成。
有什么方法可以停止这些线程而不必重写库或让用户退出 python?我在其他示例中看到的 pool.terminate()
和 pool.join()
似乎无法完成这项工作。
实际例程(而不是上面的简化版本)使用日志记录,虽然所有工作线程都在某个时候关闭,但我可以看到它们启动的进程 运行ning 继续进行直到完成(作为硬件,我可以通过观察整个房间看到它们的效果)。
这是在 python 2.7 中。
更新:
解决方案似乎是改用 multiprocessing.Process 而不是线程池。我试过的测试代码是 运行 foo_pulse:
class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name
如果您尝试使用 ThreadPool 运行ning 这个,那么 ctrl-C 不会从 运行ning 停止 foo_pulse(即使它确实会立即终止线程,打印语句继续前进:
from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())
然而,使用 multiprocessing.Process 的版本按预期工作:
import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')
我在这里为库 foo 定义了一个包装器(这样就不需要重写)。如果不需要 return 值,则此包装器也不是:
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
从文档中我看不出池无法工作的原因(除了错误)。
这是对并行性的一个非常有趣的应用。
但是,如果您使用 multiprocessing
,目标是让多个进程 运行 并行,而不是一个进程 运行 多个线程。
考虑使用 multiprocessing
:
您有这些函数将 运行 并行:
import time
import multiprocessing as mp
def some_long_task_from_library(wtime):
time.sleep(wtime)
class MyException(Exception): pass
def do_other_stuff_for_a_bit():
time.sleep(5)
raise MyException("Something Happened...")
让我们创建并启动进程,比如 4:
procs = [] # this is not a Pool, it is just a way to handle the
# processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
p = mp.Process(target=some_long_task_from_library, args=(1000,))
p.start()
procs.append(p)
mp.active_children() # this joins all the started processes, and runs them.
这些进程是 运行 并行的,大概是在一个单独的 cpu 核心中,但这要由 OS 来决定。您可以查看您的系统监视器。
与此同时,您 运行 一个会中断的进程,并且您想停止 运行ning 进程,而不是让它们成为孤儿:
try:
do_other_stuff_for_a_bit()
except MyException as exc:
print(exc)
print("Now stopping all processes...")
for p in procs:
p.terminate()
print("The rest of the process will continue")
如果在一个或所有子进程终止时继续主进程没有意义,您应该处理主程序的退出。
希望对您有所帮助,您可以为您的图书馆改编其中的一些内容。
在回答池为何不起作用的问题时,这是由于(如 Documentation 中引用的)然后 main 需要由子进程,并且由于该项目的性质,正在使用交互式 python。
同时,ThreadPool 为何会出现的原因尚不清楚 - 尽管线索就在名称中。 ThreadPool 使用 multiprocessing.dummy 创建其工作进程池,如前所述,here 只是 Threading 模块的包装器。池使用 multiprocessing.Process。通过这个测试可以看出:
p=ThreadPool(processes=3)
p._pool[0]
<DummyProcess(Thread23, started daemon 12345)> #no terminate() method
p=Pool(processes=3)
p._pool[0]
<Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed
由于线程没有终止方法,因此工作线程会继续 运行 直到它们完成当前任务。杀死线程很麻烦(这就是我尝试使用多处理模块的原因)但解决方案是 here.
关于使用上述解决方案的一个警告:
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
对象实例中的属性更改不会传回主程序。例如上面的 class foo 也可以有如下方法:
def 添加IP(新IP):
self.hardwareIP=新IP
调用 r=mp.Process(target=a.addIP,args=(127.0.0.1))
不会更新 a
。
复杂对象的唯一解决方法似乎是使用自定义 manager
共享内存,它可以访问对象的方法和属性 a
对于非常大的复杂对象基于库,最好使用 dir(foo)
来填充管理器。如果我能弄清楚如何用一个例子来更新这个答案(对于我未来的自己和其他人)。
如果出于某些原因使用线程更可取,我们可以使用 this.
We can send some siginal to the threads we want to terminate. The simplest siginal is global variable:
import time
from multiprocessing.pool import ThreadPool
_FINISH = False
def hang():
while True:
if _FINISH:
break
print 'hanging..'
time.sleep(10)
def main():
global _FINISH
pool = ThreadPool(processes=1)
pool.apply_async(hang)
time.sleep(10)
_FINISH = True
pool.terminate()
pool.join()
print 'main process exiting..'
if __name__ == '__main__':
main()