使用 python 多进程时在 (CLOSE, TERMINATE) 中断言 self._state

assert self._state in (CLOSE, TERMINATE) when using python multiprocess

我目前正在尝试使用 python 多处理。我使用的库是 multiprocess(不是 multiprocessing)。

我有以下代码,它创建了许多计算作业,并通过映射操作运行实现它:

pool = multiprocess.Pool(4)
all_responses = pool.map_async(wrapper_singlerun, range(10000))
pool.join()
pool.close()

但是,每当我 运行 这段代码时,我都会收到以下错误:

    pool.join()
  File "/Users/davidal/miniconda3/lib/python3.6/site-packages/multiprocess/pool.py", line 509, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError

你知道为什么会发生这个错误吗?我以前用过 pool.map_async,但我想我需要有一个 pool rendez-vous 命令。否则,我的 PC 创建了类似 forkbomb 的东西,它创建了太多线程(至少,我认为它是这样的......)

欢迎任何想法!

问题是您在 close 之前调用 join

multiprocess appears to be missing its documentation,但是,据我所知,它基本上是 stdlib multiprocessing 的一个分支,pre-monkeypatches dill in for pickle,所以multiprocessing 文档在这里应该是相关的。 (此外,在评论中,您说您可以使用 multiprocessing 重现该问题。)

所以,Pool.join 说:

Wait for the worker processes to exit. One must call close() or terminate() before using join().

close 方法是关闭队列发送端的方法,因此无法添加新任务。 join 方法是您等待处理队列中所有内容的方式。在关闭之前等待队列耗尽是行不通的。

但是您是在 join 之后而不是之前调用 closethe first thing join doesassert,你已经调用了 closeterminate,你还没有调用,因此断言失败。

所以,您可能只想调换这两个调用的顺序。

或者,也许您对 join 的用途感到困惑,并认为您需要先调用它才能使用 all_responses.get().wait()。如果是这样——你不需要那样做; get 将阻塞直到结果可用,之后您就不需要 join。这实际上更常见,尤其是 map 和朋友(尽管文档中的示例通过 with Pool(…) as pool: 而不是手动调用池中的任何东西)。