Python - 通知另一个在子进程上阻塞的线程
Python - Notifying another thread blocked on subprocess
我正在 linux 的 python 3.4 中创建一个带有 Web 前端的自定义作业调度程序。该程序创建一个守护进程(消费者)线程,等待 PriorityQueue 中的作业可用。这些作业可以通过将它们添加到队列的 Web 界面手动添加。当消费者线程找到工作时,它使用 subprocess.run 执行一个程序,并等待它完成。
工作线程的基本思想:
class Worker(threading.Thread):
def __init__(self, queue):
self.queue = queue
# more code here
def run(self):
while True:
try:
job = self.queue.get()
#do some work
proc = subprocess.run("myprogram", timeout=my_timeout)
#do some more things
except TimeoutExpired:
#do some administration
self.queue.add(job)
但是:
- 这个消费者应该能够从前端(主线程)接收到某种信号,它应该停止当前的作业,而是处理队列中的下一个作业(保存当前作业的状态并添加它再次到队列的末尾)。这可能(并且很可能)在 subprocess.run().
上被阻止时发生
- 子进程可以简单地被杀死(执行的程序将状态保存在文件中)但是工作线程需要对被杀死的作业进行一些管理以确保它可以在以后恢复。
- 可以有多个这样的工作线程。
- 信号处理程序不是一个选项(因为它们总是由主线程处理,主线程是一个网络服务器,不应该被这个打扰)。
- 在这种情况下,拥有一个进程主动轮询事件(例如子进程退出、超时发生或中断事件)的事件循环并不是真正的解决方案,而是一种丑陋的 hack。这些工作对性能要求很高,并且不需要不断的上下文切换。
我应该使用什么同步原语来中断此线程或确保它以阻塞方式同时等待多个事件?
我认为您不小心忽略了一个简单的解决方案:您的第二个要点说您有能力终止 运行 在子进程中的程序。请注意 subprocess.call
returns the return code of the subprocess. This means that you can let the main thread kill the subprocess, and just check the return code to see if you need to do any cleanup. Even better, you could use subprocess.check_call,如果 return 代码不为 0,这将为您引发异常。我不知道您正在使用哪个平台,但是在 Linux,被杀死的进程通常不会 return 如果它们被杀死则为 0。
它可能看起来像这样:
class Worker(threading.Thread):
def __init__(self, queue):
self.queue = queue
# more code here
def run(self):
while True:
try:
job = self.queue.get()
#do some work
subprocess.check_call("myprogram", timeout=my_timeout)
#do some more things
except (TimeoutExpired, subprocess.CalledProcessError):
#do some administration
self.queue.add(job)
请注意,如果您使用的是 Python 3.5,则可以改用 subprocess.run,并将 check
参数设置为 True
。
如果您强烈需要处理工作人员在 不是 运行 子流程时需要中断的情况,那么我认为您'您将不得不使用轮询循环,因为我认为 Python 中的线程不支持您正在寻找的行为。您可以使用 threading.Event 对象将 "stop working now" 伪信号从主线程传递给工作线程,并让工作线程定期检查该事件对象的状态。
如果您愿意考虑使用多个处理而不是线程,请考虑切换到 multiprocessing module, which would allow you to handle signals. There is more overhead to spawning full-blown subprocesses instead of threads, but you're essentially looking for signal-like asynchronous behavior, and I don't think Python's threading library supports anything like that. One benefit though, would be that you would be freed from the Global Interpreter Lock(PDF link),因此如果您的工作线程处理,您实际上可能会看到一些速度优势(以前的线程)正在做任何事情 CPU 密集。
我正在 linux 的 python 3.4 中创建一个带有 Web 前端的自定义作业调度程序。该程序创建一个守护进程(消费者)线程,等待 PriorityQueue 中的作业可用。这些作业可以通过将它们添加到队列的 Web 界面手动添加。当消费者线程找到工作时,它使用 subprocess.run 执行一个程序,并等待它完成。
工作线程的基本思想:
class Worker(threading.Thread):
def __init__(self, queue):
self.queue = queue
# more code here
def run(self):
while True:
try:
job = self.queue.get()
#do some work
proc = subprocess.run("myprogram", timeout=my_timeout)
#do some more things
except TimeoutExpired:
#do some administration
self.queue.add(job)
但是:
- 这个消费者应该能够从前端(主线程)接收到某种信号,它应该停止当前的作业,而是处理队列中的下一个作业(保存当前作业的状态并添加它再次到队列的末尾)。这可能(并且很可能)在 subprocess.run(). 上被阻止时发生
- 子进程可以简单地被杀死(执行的程序将状态保存在文件中)但是工作线程需要对被杀死的作业进行一些管理以确保它可以在以后恢复。
- 可以有多个这样的工作线程。
- 信号处理程序不是一个选项(因为它们总是由主线程处理,主线程是一个网络服务器,不应该被这个打扰)。
- 在这种情况下,拥有一个进程主动轮询事件(例如子进程退出、超时发生或中断事件)的事件循环并不是真正的解决方案,而是一种丑陋的 hack。这些工作对性能要求很高,并且不需要不断的上下文切换。
我应该使用什么同步原语来中断此线程或确保它以阻塞方式同时等待多个事件?
我认为您不小心忽略了一个简单的解决方案:您的第二个要点说您有能力终止 运行 在子进程中的程序。请注意 subprocess.call
returns the return code of the subprocess. This means that you can let the main thread kill the subprocess, and just check the return code to see if you need to do any cleanup. Even better, you could use subprocess.check_call,如果 return 代码不为 0,这将为您引发异常。我不知道您正在使用哪个平台,但是在 Linux,被杀死的进程通常不会 return 如果它们被杀死则为 0。
它可能看起来像这样:
class Worker(threading.Thread):
def __init__(self, queue):
self.queue = queue
# more code here
def run(self):
while True:
try:
job = self.queue.get()
#do some work
subprocess.check_call("myprogram", timeout=my_timeout)
#do some more things
except (TimeoutExpired, subprocess.CalledProcessError):
#do some administration
self.queue.add(job)
请注意,如果您使用的是 Python 3.5,则可以改用 subprocess.run,并将 check
参数设置为 True
。
如果您强烈需要处理工作人员在 不是 运行 子流程时需要中断的情况,那么我认为您'您将不得不使用轮询循环,因为我认为 Python 中的线程不支持您正在寻找的行为。您可以使用 threading.Event 对象将 "stop working now" 伪信号从主线程传递给工作线程,并让工作线程定期检查该事件对象的状态。
如果您愿意考虑使用多个处理而不是线程,请考虑切换到 multiprocessing module, which would allow you to handle signals. There is more overhead to spawning full-blown subprocesses instead of threads, but you're essentially looking for signal-like asynchronous behavior, and I don't think Python's threading library supports anything like that. One benefit though, would be that you would be freed from the Global Interpreter Lock(PDF link),因此如果您的工作线程处理,您实际上可能会看到一些速度优势(以前的线程)正在做任何事情 CPU 密集。