在 运行 时终止子进程并通过队列干净地进行通信
Terminating Subprocesses while they are running and communicating which communicate via queues cleanly
我正在从事一个更大的项目,其中有 2 个线程(相同的进程)和一个单独的进程。其中一个线程是 gui,另一个线程是哨兵线程,观察子进程,子进程正在用神经网络做一些繁重的工作。该架构看起来有点像这样:
我需要能够取消神经网络的进程并分别结束哨兵线程。我创建了一个小示例,它展示了总体架构以及我的方法。
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
class Worker(Process):
# The worker resembles the neural network. It does some calculations and shares
# the information via the queue.
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
def run(self):
i = 0
while True:
self.queue.put(i)
i += 1
def stop(self):
# I used the stop function for trying out some things, like using a joinable
# queue and block execution as long as the queue is not empty, which is not
# working
self.queue.put(None)
self.terminate()
class Listener(Thread):
# This class resembles the sentinel thread. It checks in an infinite loop for
# messages. In the real application I send signals via the signals and slots
# design pattern to the gui and display the sent information.
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.worker = Worker(self.queue)
def run(self):
self.worker.start()
while True:
data = self.queue.get()
if data is not None:
print(data)
else:
break
print("broken")
def stop(self):
self.worker.stop()
class System:
# This class resembles the gui
def __init__(self):
self.listener = Listener()
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()
if __name__ == "__main__":
system = System()
system.start()
sleep(0.1)
system.stop()
问题是什么?
只要一个进程对队列进行读写操作,and/or队列没有正常清空,其中一个或两个进程都变成了僵尸进程,从某种意义上说基本上就是死锁。因此,我需要找到一种方法在终止进程时正确处理队列,从而使进程无误地终止。
目前我尝试过的:
为每个 task_done()
使用可连接队列和 join()
重写 SIGTERM 信号处理程序以等待队列被清空
在 SIGTERM 信号处理程序中使用可连接队列和仅 join()
结果:
处理速度大跌,但终止正常
和 3. 终止不按照我实现的方式工作
有时有效,有时无效。所以这个方法没有可靠的输出和知识
(3) 的尝试如下:
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
self.abort = False
self.lock = Lock()
signal(SIGTERM, self.stop)
def run(self):
i = 0
while True:
self.lock.acquire()
if self.abort:
break
else:
self.queue.put(i)
i += 1
self.lock.release()
exit(0)
def stop(self, sig, frame):
self.abort = True
self.queue.put(None)
self.queue.join()
exit(0)
有多种可能的方法,但如果您的目标是在性能和稳健性之间取得折衷,我建议您使用 signal-handler 仅设置 .running
-flag 并在 worker.run()
内用 while self.running
检查它。循环中断后,您从 worker 发送 sentinel-value。这确保 sentinel-value 始终是队列中的最后一个值,并且所有值都由侦听器读取。这种布局一起允许 worker 正常关闭,同时仍然避免更昂贵的同步来检查 exit-condition.
from multiprocessing import Process, Queue
from functools import partial
from threading import Thread
from time import sleep
import signal
SENTINEL = 'SENTINEL'
def sigterm_handler(signum, frame, worker):
worker.shutdown()
def register_sigterm(worker):
global sigterm_handler
sigterm_handler = partial(sigterm_handler, worker=worker)
signal.signal(signal.SIGTERM, sigterm_handler)
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
self.running = False
def run(self):
register_sigterm(self)
self.running = True
i = 0
while self.running:
self.queue.put(i)
i += 1
self.queue.put(SENTINEL)
def stop(self): # called by parent
self.terminate()
def shutdown(self): # called by child from signal-handler
self.running = False
class Listener(Thread):
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.worker = Worker(self.queue)
def run(self):
self.worker.start()
for data in iter(self.queue.get, SENTINEL):
print(data)
def stop(self):
self.worker.stop()
self.worker.join()
class System:
def __init__(self):
self.listener = Listener()
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()
if __name__ == "__main__":
system = System()
system.start()
sleep(0.1)
system.stop()
考虑以下实验。
思路是在child中monkey-patch一个queue-instance那样,即在收到SIGTERM后,下次调用queue.put()
时,传值并且指定的 sentinel-value 被发送,queue.close()
和 sys.exit()
被调用。这允许干净关闭,同时避免重复 flag-checking.
multiprocessing.Queue()
实际上只是 multiprocessing.context.BaseContext
上的一个方法,返回 multiprocessing.queues.Queue
的 pre-configured 实例。为了不干扰它,我选择了组合而不是继承。到目前为止的测试表明它工作正常。
stqueue.py
import sys
import time
import signal
from functools import partial
from multiprocessing import current_process as curr_p
def _shutdown(self):
self._xput = self.put
self.put = self.final_put
def _final_put(self, obj):
self._xput(obj)
self._xput(self._xsentinel)
self.close()
sys.exit(0)
def _sigterm_handler(signum, frame, queue):
print(f"[{time.ctime()}, {curr_p().name}] --- handling signal")
queue.shutdown()
def register_sigterm_queue(queue, sentinel):
"""Monkey-patch queue-instance to shutdown process
after next call to `queue.put()` upon receipt of SIGTERM.
"""
queue._xsentinel = sentinel
queue.shutdown = _shutdown.__get__(queue)
queue.final_put = _final_put.__get__(queue)
global _sigterm_handler
_sigterm_handler = partial(_sigterm_handler, queue=queue)
signal.signal(signal.SIGTERM, _sigterm_handler)
main.py
import time
from threading import Thread
import multiprocessing as mp
from multiprocessing import Process, Queue, current_process as curr_p
import numpy as np
from stqueue import register_sigterm_queue
SENTINEL = 'SENTINEL'
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
def run(self):
register_sigterm_queue(self.queue, SENTINEL) # <<<
while True:
print(f"[{time.ctime()}, {curr_p().name}] --- starting numpy")
r = np.sum(
np.unique(np.random.randint(0, 2500, 100_000_000))
)
print(f"[{time.ctime()}, {curr_p().name}] --- ending numpy")
self.queue.put(r)
def stop(self): # called by parent
self.terminate()
...
if __name__ == "__main__":
import logging
mp.log_to_stderr(logging.DEBUG)
system = System()
system.start()
time.sleep(10)
print(f"[{time.ctime()}, {curr_p().name}] --- sending signal")
system.stop()
print(f"[{time.ctime()}, {curr_p().name}] --- signal send")
示例输出:
[DEBUG/MainProcess] created semlock with handle 140000699432960
[DEBUG/MainProcess] created semlock with handle 140000699428864
[DEBUG/MainProcess] created semlock with handle 140000664752128
[DEBUG/MainProcess] Queue._after_fork()
[Sat Oct 24 21:59:58 2020, Worker-1] --- starting numpy
[DEBUG/Worker-1] recreated blocker with handle 140000699432960
[DEBUG/Worker-1] recreated blocker with handle 140000699428864
[DEBUG/Worker-1] recreated blocker with handle 140000664752128
[DEBUG/Worker-1] Queue._after_fork()
[INFO/Worker-1] child process calling self.run()
[DEBUG/Worker-1] Queue._start_thread()
[DEBUG/Worker-1] doing self._thread.start()
[DEBUG/Worker-1] starting thread to feed data to pipe
[DEBUG/Worker-1] ... done self._thread.start()
[Sat Oct 24 22:00:04 2020, Worker-1] --- ending numpy
[Sat Oct 24 22:00:04 2020, Worker-1] --- starting numpy
3123750
[Sat Oct 24 22:00:08 2020, MainProcess] --- sending signal
[Sat Oct 24 22:00:10 2020, Worker-1] --- handling signal
[DEBUG/Worker-1] telling queue thread to quit
[INFO/Worker-1] process shutting down
[DEBUG/Worker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-1] running the remaining "atexit" finalizers
[DEBUG/Worker-1] joining queue thread
[DEBUG/Worker-1] feeder thread got sentinel -- exiting
[DEBUG/Worker-1] ... queue thread joined
[INFO/Worker-1] process exiting with exitcode 0
[Sat Oct 24 22:00:10 2020, Worker-1] --- ending numpy
3123750
[Sat Oct 24 22:00:10 2020, MainProcess] --- signal send
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
Process finished with exit code 0
我正在从事一个更大的项目,其中有 2 个线程(相同的进程)和一个单独的进程。其中一个线程是 gui,另一个线程是哨兵线程,观察子进程,子进程正在用神经网络做一些繁重的工作。该架构看起来有点像这样:
我需要能够取消神经网络的进程并分别结束哨兵线程。我创建了一个小示例,它展示了总体架构以及我的方法。
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
class Worker(Process):
# The worker resembles the neural network. It does some calculations and shares
# the information via the queue.
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
def run(self):
i = 0
while True:
self.queue.put(i)
i += 1
def stop(self):
# I used the stop function for trying out some things, like using a joinable
# queue and block execution as long as the queue is not empty, which is not
# working
self.queue.put(None)
self.terminate()
class Listener(Thread):
# This class resembles the sentinel thread. It checks in an infinite loop for
# messages. In the real application I send signals via the signals and slots
# design pattern to the gui and display the sent information.
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.worker = Worker(self.queue)
def run(self):
self.worker.start()
while True:
data = self.queue.get()
if data is not None:
print(data)
else:
break
print("broken")
def stop(self):
self.worker.stop()
class System:
# This class resembles the gui
def __init__(self):
self.listener = Listener()
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()
if __name__ == "__main__":
system = System()
system.start()
sleep(0.1)
system.stop()
问题是什么?
只要一个进程对队列进行读写操作,and/or队列没有正常清空,其中一个或两个进程都变成了僵尸进程,从某种意义上说基本上就是死锁。因此,我需要找到一种方法在终止进程时正确处理队列,从而使进程无误地终止。
目前我尝试过的:
为每个 task_done()
使用可连接队列和 join()重写 SIGTERM 信号处理程序以等待队列被清空
在 SIGTERM 信号处理程序中使用可连接队列和仅 join()
结果:
处理速度大跌,但终止正常
和 3. 终止不按照我实现的方式工作 有时有效,有时无效。所以这个方法没有可靠的输出和知识
(3) 的尝试如下:
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
self.abort = False
self.lock = Lock()
signal(SIGTERM, self.stop)
def run(self):
i = 0
while True:
self.lock.acquire()
if self.abort:
break
else:
self.queue.put(i)
i += 1
self.lock.release()
exit(0)
def stop(self, sig, frame):
self.abort = True
self.queue.put(None)
self.queue.join()
exit(0)
有多种可能的方法,但如果您的目标是在性能和稳健性之间取得折衷,我建议您使用 signal-handler 仅设置 .running
-flag 并在 worker.run()
内用 while self.running
检查它。循环中断后,您从 worker 发送 sentinel-value。这确保 sentinel-value 始终是队列中的最后一个值,并且所有值都由侦听器读取。这种布局一起允许 worker 正常关闭,同时仍然避免更昂贵的同步来检查 exit-condition.
from multiprocessing import Process, Queue
from functools import partial
from threading import Thread
from time import sleep
import signal
SENTINEL = 'SENTINEL'
def sigterm_handler(signum, frame, worker):
worker.shutdown()
def register_sigterm(worker):
global sigterm_handler
sigterm_handler = partial(sigterm_handler, worker=worker)
signal.signal(signal.SIGTERM, sigterm_handler)
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
self.running = False
def run(self):
register_sigterm(self)
self.running = True
i = 0
while self.running:
self.queue.put(i)
i += 1
self.queue.put(SENTINEL)
def stop(self): # called by parent
self.terminate()
def shutdown(self): # called by child from signal-handler
self.running = False
class Listener(Thread):
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.worker = Worker(self.queue)
def run(self):
self.worker.start()
for data in iter(self.queue.get, SENTINEL):
print(data)
def stop(self):
self.worker.stop()
self.worker.join()
class System:
def __init__(self):
self.listener = Listener()
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()
if __name__ == "__main__":
system = System()
system.start()
sleep(0.1)
system.stop()
考虑以下实验。
思路是在child中monkey-patch一个queue-instance那样,即在收到SIGTERM后,下次调用queue.put()
时,传值并且指定的 sentinel-value 被发送,queue.close()
和 sys.exit()
被调用。这允许干净关闭,同时避免重复 flag-checking.
multiprocessing.Queue()
实际上只是 multiprocessing.context.BaseContext
上的一个方法,返回 multiprocessing.queues.Queue
的 pre-configured 实例。为了不干扰它,我选择了组合而不是继承。到目前为止的测试表明它工作正常。
stqueue.py
import sys
import time
import signal
from functools import partial
from multiprocessing import current_process as curr_p
def _shutdown(self):
self._xput = self.put
self.put = self.final_put
def _final_put(self, obj):
self._xput(obj)
self._xput(self._xsentinel)
self.close()
sys.exit(0)
def _sigterm_handler(signum, frame, queue):
print(f"[{time.ctime()}, {curr_p().name}] --- handling signal")
queue.shutdown()
def register_sigterm_queue(queue, sentinel):
"""Monkey-patch queue-instance to shutdown process
after next call to `queue.put()` upon receipt of SIGTERM.
"""
queue._xsentinel = sentinel
queue.shutdown = _shutdown.__get__(queue)
queue.final_put = _final_put.__get__(queue)
global _sigterm_handler
_sigterm_handler = partial(_sigterm_handler, queue=queue)
signal.signal(signal.SIGTERM, _sigterm_handler)
main.py
import time
from threading import Thread
import multiprocessing as mp
from multiprocessing import Process, Queue, current_process as curr_p
import numpy as np
from stqueue import register_sigterm_queue
SENTINEL = 'SENTINEL'
class Worker(Process):
def __init__(self, queue: Queue):
Process.__init__(self)
self.queue = queue
def run(self):
register_sigterm_queue(self.queue, SENTINEL) # <<<
while True:
print(f"[{time.ctime()}, {curr_p().name}] --- starting numpy")
r = np.sum(
np.unique(np.random.randint(0, 2500, 100_000_000))
)
print(f"[{time.ctime()}, {curr_p().name}] --- ending numpy")
self.queue.put(r)
def stop(self): # called by parent
self.terminate()
...
if __name__ == "__main__":
import logging
mp.log_to_stderr(logging.DEBUG)
system = System()
system.start()
time.sleep(10)
print(f"[{time.ctime()}, {curr_p().name}] --- sending signal")
system.stop()
print(f"[{time.ctime()}, {curr_p().name}] --- signal send")
示例输出:
[DEBUG/MainProcess] created semlock with handle 140000699432960
[DEBUG/MainProcess] created semlock with handle 140000699428864
[DEBUG/MainProcess] created semlock with handle 140000664752128
[DEBUG/MainProcess] Queue._after_fork()
[Sat Oct 24 21:59:58 2020, Worker-1] --- starting numpy
[DEBUG/Worker-1] recreated blocker with handle 140000699432960
[DEBUG/Worker-1] recreated blocker with handle 140000699428864
[DEBUG/Worker-1] recreated blocker with handle 140000664752128
[DEBUG/Worker-1] Queue._after_fork()
[INFO/Worker-1] child process calling self.run()
[DEBUG/Worker-1] Queue._start_thread()
[DEBUG/Worker-1] doing self._thread.start()
[DEBUG/Worker-1] starting thread to feed data to pipe
[DEBUG/Worker-1] ... done self._thread.start()
[Sat Oct 24 22:00:04 2020, Worker-1] --- ending numpy
[Sat Oct 24 22:00:04 2020, Worker-1] --- starting numpy
3123750
[Sat Oct 24 22:00:08 2020, MainProcess] --- sending signal
[Sat Oct 24 22:00:10 2020, Worker-1] --- handling signal
[DEBUG/Worker-1] telling queue thread to quit
[INFO/Worker-1] process shutting down
[DEBUG/Worker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-1] running the remaining "atexit" finalizers
[DEBUG/Worker-1] joining queue thread
[DEBUG/Worker-1] feeder thread got sentinel -- exiting
[DEBUG/Worker-1] ... queue thread joined
[INFO/Worker-1] process exiting with exitcode 0
[Sat Oct 24 22:00:10 2020, Worker-1] --- ending numpy
3123750
[Sat Oct 24 22:00:10 2020, MainProcess] --- signal send
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
Process finished with exit code 0