Python multiprocessing - 检查每个进程的状态
Python multiprocessing - check status of each processes
我想知道是否可以检查每个进程需要多长时间。
例如,有四名工人,工作应该不超过 10 秒,但其中一名工人需要超过 10 seconds.Is 才能在 10 秒后和进程完成工作之前发出警报。
我最初的想法是使用管理器,但似乎我必须等到流程完成。
非常感谢。
我很久以前(在 Whosebug 的某个地方)找到了这个解决方案,我对此非常满意。
基本上,如果进程花费的时间超过预期,它会使用 signal 引发异常。
您需要做的就是将此 class 添加到您的代码中:
import signal
class Timeout:
def __init__(self, seconds=1, error_message='TimeoutError'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
以下是其工作原理的一般示例:
import time
with Timeout(seconds=3, error_message='JobX took too much time'):
try:
time.sleep(10) #your job
except TimeoutError as e:
print(e)
对于您的情况,我会将 with 语句添加到您的工作人员需要执行的工作中。然后你捕获异常并做你认为最好的事情。
或者,您可以定期检查进程是否处于活动状态:
timeout = 3 #seconds
start = time.time()
while time.time() - start < timeout:
if any(proces.is_alive() for proces in processes):
time.sleep(1)
else:
print('All processes done')
else:
print("Timeout!")
# do something
您可以在尝试加入后检查进程是否存活。不要忘记设置超时,否则它会等到作业完成。
这是给你的简单例子
from multiprocessing import Process
import time
def task():
import time
time.sleep(5)
procs = []
for x in range(2):
proc = Process(target=task)
procs.append(proc)
proc.start()
time.sleep(2)
for proc in procs:
proc.join(timeout=0)
if proc.is_alive():
print "Job is not finished!"
使用管道和消息
from multiprocessing import Process, Pipe
import numpy as np
caller, worker = Pipe()
val1 = ['der', 'die', 'das']
def worker_function(info):
print (info.recv())
for i in range(10):
print (val1[np.random.choice(3, 1)[0]])
info.send(['job finished'])
info.close()
def request(data):
caller.send(data)
task = Process(target=worker_function, args=(worker,))
if not task.is_alive():
print ("task is requested")
task.start()
if caller.recv() == ['job finished']:
task.join()
print ("finished")
if __name__ == '__main__':
data = {'input': 'here'}
request(data)
我想知道是否可以检查每个进程需要多长时间。
例如,有四名工人,工作应该不超过 10 秒,但其中一名工人需要超过 10 seconds.Is 才能在 10 秒后和进程完成工作之前发出警报。
我最初的想法是使用管理器,但似乎我必须等到流程完成。
非常感谢。
我很久以前(在 Whosebug 的某个地方)找到了这个解决方案,我对此非常满意。
基本上,如果进程花费的时间超过预期,它会使用 signal 引发异常。
您需要做的就是将此 class 添加到您的代码中:
import signal
class Timeout:
def __init__(self, seconds=1, error_message='TimeoutError'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
以下是其工作原理的一般示例:
import time
with Timeout(seconds=3, error_message='JobX took too much time'):
try:
time.sleep(10) #your job
except TimeoutError as e:
print(e)
对于您的情况,我会将 with 语句添加到您的工作人员需要执行的工作中。然后你捕获异常并做你认为最好的事情。
或者,您可以定期检查进程是否处于活动状态:
timeout = 3 #seconds
start = time.time()
while time.time() - start < timeout:
if any(proces.is_alive() for proces in processes):
time.sleep(1)
else:
print('All processes done')
else:
print("Timeout!")
# do something
您可以在尝试加入后检查进程是否存活。不要忘记设置超时,否则它会等到作业完成。
这是给你的简单例子
from multiprocessing import Process
import time
def task():
import time
time.sleep(5)
procs = []
for x in range(2):
proc = Process(target=task)
procs.append(proc)
proc.start()
time.sleep(2)
for proc in procs:
proc.join(timeout=0)
if proc.is_alive():
print "Job is not finished!"
使用管道和消息
from multiprocessing import Process, Pipe
import numpy as np
caller, worker = Pipe()
val1 = ['der', 'die', 'das']
def worker_function(info):
print (info.recv())
for i in range(10):
print (val1[np.random.choice(3, 1)[0]])
info.send(['job finished'])
info.close()
def request(data):
caller.send(data)
task = Process(target=worker_function, args=(worker,))
if not task.is_alive():
print ("task is requested")
task.start()
if caller.recv() == ['job finished']:
task.join()
print ("finished")
if __name__ == '__main__':
data = {'input': 'here'}
request(data)