Python 的多处理在提前终止时出现死锁

Deadlock in Python's multiprocessing upon early termination

我正在 Python 中创建一个 multiprocessing.Queue 并向此 Queue 添加 multiprocessing.Process 个实例。

我想添加一个在每个 job 之后执行的函数调用,它检查特定任务是否成功。如果是这样,我想清空 Queue 并终止执行。

我的Processclass是:

class Worker(multiprocessing.Process):

    def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
        super(Worker, self).__init__()
        self.check_success = check_success
        self.directory = directory
        self.permit_nonzero = permit_nonzero
        self.queue = queue

    def run(self):
        for job in iter(self.queue.get, None):
            stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
            with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
                f_out.write(stdout)
            if callable(self.check_success) and self.check_success(job):
                # Terminate all remaining jobs here
                pass

我的Queue设置在这里:

class LocalJobServer(object):

    @staticmethod
    def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
        if check_success and not callable(check_success):
            msg = "check_success option requires a callable function/object: {0}".format(check_success)
            raise ValueError(msg)

        # Create a new queue
        queue = multiprocessing.Queue()
        # Create workers equivalent to the number of jobs
        workers = []
        for _ in range(nproc):
            wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
            wp.start()
            workers.append(wp)
        # Add each command to the queue
        for cmd in command:
            queue.put(cmd, timeout=time)
        # Stop workers from exiting without completion
        for _ in range(nproc):
            queue.put(None)
        for wp in workers:
            wp.join()

函数调用 mbkit.dispatch.cexectools.cexec()subprocess.Popen 和 returns p.stdout.

的包装器

Worker class 中,我编写了检查作业是否成功的条件,并尝试使用 [=26= 清空 Queue 中剩余的作业] 循环,即我的 Worker.run() 函数如下所示:

def run(self):
    for job in iter(self.queue.get, None):
        stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
        with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
            f_out.write(stdout)
        if callable(self.check_success) and self.check_success(job):
            break
    while not self.queue.empty():
        self.queue.get()

虽然这有时可行,但通常会出现死锁,我唯一的选择是 Ctrl-C。我知道 .empty() 是不可靠的,因此我的问题。

关于如何实现这种提前终止功能的任何建议?

这可能不是最佳解决方案,非常感谢任何其他建议,但我设法解决了这个问题:

class Worker(multiprocessing.Process):
    """Simple manual worker class to execute jobs in the queue"""

    def __init__(self, queue, success, check_success=None, directory=None, permit_nonzero=False):
        super(Worker, self).__init__()
        self.check_success = check_success
        self.directory = directory
        self.permit_nonzero = permit_nonzero
        self.success = success
        self.queue = queue

    def run(self):
        """Method representing the process's activity"""
        for job in iter(self.queue.get, None):
            if self.success.value:
                continue
            stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
            with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
                f_out.write(stdout)
            if callable(self.check_success) and self.check_success(job):
                self.success.value = int(True)
            time.sleep(1)


class LocalJobServer(object):
    """A local server to execute jobs via the multiprocessing module"""

    @staticmethod
    def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
        if check_success and not callable(check_success):
            msg = "check_success option requires a callable function/object: {0}".format(check_success)
            raise ValueError(msg)

        # Create a new queue
        queue = multiprocessing.Queue()
        success = multiprocessing.Value('i', int(False))
        # Create workers equivalent to the number of jobs
        workers = []
        for _ in range(nproc):
            wp = Worker(queue, success, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
            wp.start()
            workers.append(wp)
        # Add each command to the queue
        for cmd in command:
            queue.put(cmd)
        # Stop workers from exiting without completion
        for _ in range(nproc):
            queue.put(None)
        # Start the workers
        for wp in workers:
            wp.join(time)

基本上我正在创建一个 Value and providing that to each Process. Once a job is marked as successful, this variable gets updated. Each Process checks in if self.success.value: continue whether we have a success and if so, just iterates over the remaining jobs in the Queue 直到为空。

需要 time.sleep(1) 调用来解决进程之间潜在的同步延迟问题。这当然不是最有效的方法,但它确实有效。

这里没有死锁。它只是链接到 multiprocessing.Queue 的行为,因为 get 方法默认是阻塞的。因此,当您在空队列上调用 get 时,调用会停止,等待下一个元素准备就绪。你可以看到你的一些工人会停止,因为当你使用你的循环 while not self.queue.empty() 来清空它时,你删除了所有 None 哨兵并且你的一些工人会阻塞在空的 Queue ,就像在这段代码中:

from multiprocessing import Queue
q = Queue()
for e in iter(q.get, None):
    print(e)

要在队列为空时得到通知,需要使用非阻塞调用。例如,您可以使用 q.get_nowait,或在 q.get(timeout=1) 中使用超时。当队列为空时,两者都会抛出 multiprocessing.queues.Empty 异常。所以你应该用类似的东西替换你的 Worker for job in iter(...): 循环:

while not queue.empty():
    try:
        job = queue.get(timeout=.1)
    except multiprocessing.queues.Empty:
        continue
    # Do stuff with your job

如果你不想在任何时候被卡住。

对于同步部分,我建议使用 multiprocessing.Condition or an multiprocessing.Event 等同步原语。这比价值更清洁,因为它们是为此目的而设计的。这样的事情应该有所帮助

def run(self):
    while not queue.empty():
        try:
            job = queue.get(timeout=.1)
        except multiprocessing.queues.Empty:
            continue
        if self.event.is_set():
            continue
        stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
        with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
            f_out.write(stdout)
        if callable(self.check_success) and self.check_success(job):
            self.event.set()
    print("Worker {} terminated cleanly".format(self.name))

event = multiprocessing.Event().

请注意,也可以使用 multiprocessing.Pool 来避免处理队列和工作人员。但是由于您需要一些同步原语,因此设置起来可能会稍微复杂一些。这样的事情应该有效:

 def worker(job, success, check_success=None, directory=None, permit_nonzero=False):
      if sucess.is_set():
          return False
      stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
      with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
          f_out.write(stdout)
      if callable(self.check_success) and self.check_success(job):
          success.set()
      return True

# ......
# In the class LocalJobServer
# .....

def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False):

    mgr = multiprocessing.Manager()
    success = mgr.Event()

    pool = multiprocessing.Pool(nproc)
    run_args = [(cmd, success, check_success, directory, permit_nonzero)]
    result = pool.starmap(worker, run_args)

    pool.close()
    pool.join()

请注意,我使用的是管理器,因为您不能直接将 multiprocessing.Event 作为参数传递。您还可以使用 Pool 的参数 initializerinitargs 在每个 worker 中启动全局 success 事件并避免依赖 Manager 但它稍微更复杂。