Python 的多处理在提前终止时出现死锁
Deadlock in Python's multiprocessing upon early termination
我正在 Python 中创建一个 multiprocessing.Queue
并向此 Queue
添加 multiprocessing.Process
个实例。
我想添加一个在每个 job
之后执行的函数调用,它检查特定任务是否成功。如果是这样,我想清空 Queue
并终止执行。
我的Process
class是:
class Worker(multiprocessing.Process):
def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.queue = queue
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
# Terminate all remaining jobs here
pass
我的Queue
设置在这里:
class LocalJobServer(object):
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd, timeout=time)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
for wp in workers:
wp.join()
函数调用 mbkit.dispatch.cexectools.cexec()
是 subprocess.Popen
和 returns p.stdout
.
的包装器
在 Worker
class 中,我编写了检查作业是否成功的条件,并尝试使用 [=26= 清空 Queue
中剩余的作业] 循环,即我的 Worker.run()
函数如下所示:
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
break
while not self.queue.empty():
self.queue.get()
虽然这有时可行,但通常会出现死锁,我唯一的选择是 Ctrl-C
。我知道 .empty()
是不可靠的,因此我的问题。
关于如何实现这种提前终止功能的任何建议?
这可能不是最佳解决方案,非常感谢任何其他建议,但我设法解决了这个问题:
class Worker(multiprocessing.Process):
"""Simple manual worker class to execute jobs in the queue"""
def __init__(self, queue, success, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.success = success
self.queue = queue
def run(self):
"""Method representing the process's activity"""
for job in iter(self.queue.get, None):
if self.success.value:
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.success.value = int(True)
time.sleep(1)
class LocalJobServer(object):
"""A local server to execute jobs via the multiprocessing module"""
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
success = multiprocessing.Value('i', int(False))
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, success, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
# Start the workers
for wp in workers:
wp.join(time)
基本上我正在创建一个 Value
and providing that to each Process
. Once a job is marked as successful, this variable gets updated. Each Process
checks in if self.success.value: continue
whether we have a success and if so, just iterates over the remaining jobs in the Queue
直到为空。
需要 time.sleep(1)
调用来解决进程之间潜在的同步延迟问题。这当然不是最有效的方法,但它确实有效。
这里没有死锁。它只是链接到 multiprocessing.Queue
的行为,因为 get
方法默认是阻塞的。因此,当您在空队列上调用 get
时,调用会停止,等待下一个元素准备就绪。你可以看到你的一些工人会停止,因为当你使用你的循环 while not self.queue.empty()
来清空它时,你删除了所有 None
哨兵并且你的一些工人会阻塞在空的 Queue
,就像在这段代码中:
from multiprocessing import Queue
q = Queue()
for e in iter(q.get, None):
print(e)
要在队列为空时得到通知,需要使用非阻塞调用。例如,您可以使用 q.get_nowait
,或在 q.get(timeout=1)
中使用超时。当队列为空时,两者都会抛出 multiprocessing.queues.Empty
异常。所以你应该用类似的东西替换你的 Worker
for job in iter(...):
循环:
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
# Do stuff with your job
如果你不想在任何时候被卡住。
对于同步部分,我建议使用 multiprocessing.Condition
or an multiprocessing.Event
等同步原语。这比价值更清洁,因为它们是为此目的而设计的。这样的事情应该有所帮助
def run(self):
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
if self.event.is_set():
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.event.set()
print("Worker {} terminated cleanly".format(self.name))
与 event = multiprocessing.Event()
.
请注意,也可以使用 multiprocessing.Pool
来避免处理队列和工作人员。但是由于您需要一些同步原语,因此设置起来可能会稍微复杂一些。这样的事情应该有效:
def worker(job, success, check_success=None, directory=None, permit_nonzero=False):
if sucess.is_set():
return False
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
success.set()
return True
# ......
# In the class LocalJobServer
# .....
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False):
mgr = multiprocessing.Manager()
success = mgr.Event()
pool = multiprocessing.Pool(nproc)
run_args = [(cmd, success, check_success, directory, permit_nonzero)]
result = pool.starmap(worker, run_args)
pool.close()
pool.join()
请注意,我使用的是管理器,因为您不能直接将 multiprocessing.Event
作为参数传递。您还可以使用 Pool
的参数 initializer
和 initargs
在每个 worker 中启动全局 success
事件并避免依赖 Manager
但它稍微更复杂。
我正在 Python 中创建一个 multiprocessing.Queue
并向此 Queue
添加 multiprocessing.Process
个实例。
我想添加一个在每个 job
之后执行的函数调用,它检查特定任务是否成功。如果是这样,我想清空 Queue
并终止执行。
我的Process
class是:
class Worker(multiprocessing.Process):
def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.queue = queue
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
# Terminate all remaining jobs here
pass
我的Queue
设置在这里:
class LocalJobServer(object):
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd, timeout=time)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
for wp in workers:
wp.join()
函数调用 mbkit.dispatch.cexectools.cexec()
是 subprocess.Popen
和 returns p.stdout
.
在 Worker
class 中,我编写了检查作业是否成功的条件,并尝试使用 [=26= 清空 Queue
中剩余的作业] 循环,即我的 Worker.run()
函数如下所示:
def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
break
while not self.queue.empty():
self.queue.get()
虽然这有时可行,但通常会出现死锁,我唯一的选择是 Ctrl-C
。我知道 .empty()
是不可靠的,因此我的问题。
关于如何实现这种提前终止功能的任何建议?
这可能不是最佳解决方案,非常感谢任何其他建议,但我设法解决了这个问题:
class Worker(multiprocessing.Process):
"""Simple manual worker class to execute jobs in the queue"""
def __init__(self, queue, success, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.success = success
self.queue = queue
def run(self):
"""Method representing the process's activity"""
for job in iter(self.queue.get, None):
if self.success.value:
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.success.value = int(True)
time.sleep(1)
class LocalJobServer(object):
"""A local server to execute jobs via the multiprocessing module"""
@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)
# Create a new queue
queue = multiprocessing.Queue()
success = multiprocessing.Value('i', int(False))
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, success, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
# Start the workers
for wp in workers:
wp.join(time)
基本上我正在创建一个 Value
and providing that to each Process
. Once a job is marked as successful, this variable gets updated. Each Process
checks in if self.success.value: continue
whether we have a success and if so, just iterates over the remaining jobs in the Queue
直到为空。
需要 time.sleep(1)
调用来解决进程之间潜在的同步延迟问题。这当然不是最有效的方法,但它确实有效。
这里没有死锁。它只是链接到 multiprocessing.Queue
的行为,因为 get
方法默认是阻塞的。因此,当您在空队列上调用 get
时,调用会停止,等待下一个元素准备就绪。你可以看到你的一些工人会停止,因为当你使用你的循环 while not self.queue.empty()
来清空它时,你删除了所有 None
哨兵并且你的一些工人会阻塞在空的 Queue
,就像在这段代码中:
from multiprocessing import Queue
q = Queue()
for e in iter(q.get, None):
print(e)
要在队列为空时得到通知,需要使用非阻塞调用。例如,您可以使用 q.get_nowait
,或在 q.get(timeout=1)
中使用超时。当队列为空时,两者都会抛出 multiprocessing.queues.Empty
异常。所以你应该用类似的东西替换你的 Worker
for job in iter(...):
循环:
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
# Do stuff with your job
如果你不想在任何时候被卡住。
对于同步部分,我建议使用 multiprocessing.Condition
or an multiprocessing.Event
等同步原语。这比价值更清洁,因为它们是为此目的而设计的。这样的事情应该有所帮助
def run(self):
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
if self.event.is_set():
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.event.set()
print("Worker {} terminated cleanly".format(self.name))
与 event = multiprocessing.Event()
.
请注意,也可以使用 multiprocessing.Pool
来避免处理队列和工作人员。但是由于您需要一些同步原语,因此设置起来可能会稍微复杂一些。这样的事情应该有效:
def worker(job, success, check_success=None, directory=None, permit_nonzero=False):
if sucess.is_set():
return False
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
success.set()
return True
# ......
# In the class LocalJobServer
# .....
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False):
mgr = multiprocessing.Manager()
success = mgr.Event()
pool = multiprocessing.Pool(nproc)
run_args = [(cmd, success, check_success, directory, permit_nonzero)]
result = pool.starmap(worker, run_args)
pool.close()
pool.join()
请注意,我使用的是管理器,因为您不能直接将 multiprocessing.Event
作为参数传递。您还可以使用 Pool
的参数 initializer
和 initargs
在每个 worker 中启动全局 success
事件并避免依赖 Manager
但它稍微更复杂。