多进程池的优雅清理 python

Graceful cleanup for multiprocess pool python

我是 运行 一个多处理池,映射了多个输入。我的工作进程有一个初始化步骤,可以启动与 selenium 和数据库的连接。当池完成其工作时,关闭这些连接的优雅方式是什么而不是仅仅依赖 python 的内存管理和 del 定义?

编辑:

class WebDriver():
  def close():
    //close logic

  def __del__():
    self.driver.close()

def init():
  global DRIVER
  DRIVER=WebDriver()

def shutdown():
  DRIVER.close()

if __name__=='__main__':
  with multiprocessing.Pool(initializer=init) as pool:
    pool.map(some_function, some_args)

因为some_args很大,我只想在工作进程没有其他工作要做时调用关闭。在一切都完成之前,我不想关闭/重新打开与我的数据库的连接。

截至目前,如果工作进程关闭,我希望内存管理器调用 __del__,但我不知道它是否会发生。我遇到了没有调用它的奇怪场景。我希望更好地了解如何管理关机。

我认为如果您先等待池进程终止然后强制进行垃圾回收,那么您很有可能会关闭驱动程序:

if __name__=='__main__':
    with multiprocessing.Pool(initializer=init) as pool:
        try:
            pool.map(some_function, some_args)
        finally:
            # Wait for all tasks to complete and all processes to terminate:
            pool.close()
            pool.join()
            # Processes should be done now:
            import gc
            gc.collect() # ensure garbage collection

使用用户创建的池的解决方案

import multiprocessing


class WebDriver():

    def close(self):
        ...
        print('driver is now closed')

    def do_something(self, i):
        import time
        time.sleep(.1)
        print(i, flush=True)

    def __enter__(self):
        self.driver = [] # this would be an actual driver
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()



def some_function(i):
    # Do something with DRIVER:
    ...
    DRIVER.do_something(i)

def worker(in_q):
    global DRIVER

    with WebDriver() as DRIVER:
        # Iterate until we get special None record and then cleanup:
        for i in iter(in_q.get, None):
            try:
                some_function(i)
            except BaseException as e:
                pass

if __name__=='__main__':
    POOL_SIZE = multiprocessing.cpu_count()
    # Create pool:
    # Assumption is that we don't need an output queue for output
    in_q = multiprocessing.Queue()
    processes = [multiprocessing.Process(target=worker, args=(in_q,))
                 for _ in range(POOL_SIZE)
                 ]
    for p in processes:
        p.start()
    # Write arguments to input_queue:
    some_args = range(16)
    for arg in some_args:
        in_q.put(arg)
    # Now write POOL_SIZE "quit" messages:
    for _ in range(POOL_SIZE):
        in_q.put(None)
    # Wait for processes to terminate:
    for p in processes:
        p.join()

打印:

0
1
2
3
4
5
6
7
8
driver is now closed
9
driver is now closed
10
driver is now closed
11
driver is now closed
12
driver is now closed
14
13
driver is now closed
driver is now closed
15
driver is now closed