Python multiprocessing.Pool.map 默默地死去
Python multiprocessing.Pool.map dying silently
我试图并行放置一个for循环来加速一些代码。考虑一下:
from multiprocessing import Pool
results = []
def do_stuff(str):
print str
results.append(str)
p = Pool(4)
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000
p.close()
print results
我从 do_stuff
中显示了一些调试消息,以跟踪程序在死前进行了多长时间。它似乎每次都在不同的时间点死亡。例如它会打印 'str297' 然后它会停止 运行,我会看到所有的 CPU 都停止工作,程序就在那里。应该是发生了一些错误,但没有显示错误消息。有谁知道如何调试这个问题?
更新
我试着重新编写了一些代码。我没有使用 map
函数,而是尝试了 apply_async
函数,如下所示:
pool = Pool(5)
results = pool.map(do_sym, underlyings[0::10])
results = []
for sym in underlyings[0::10]:
r = pool.apply_async(do_sym, [sym])
results.append(r)
pool.close()
pool.join()
for result in results:
print result.get(timeout=1000)
这与 map
函数一样有效,但最终以相同的方式挂起。它永远不会到达打印结果的 for 循环。
在进一步处理此问题并尝试像 unutbu 的回答中建议的那样进行一些调试日志记录之后,我将在此处提供更多信息。问题很奇怪。似乎池只是挂在那里,无法关闭并继续该程序。我使用 PyDev 环境来测试我的程序,但我想我会在控制台中尝试 运行 python。在控制台中,我得到了相同的行为,但是当我按下 control+C 终止程序时,我得到了一些可能解释问题所在的输出:
> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46:
> Process PoolWorker-44:
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> Traceback (most recent call last): Traceback (most recent call last):
> Traceback (most recent call last): File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> racquire()
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> KeyboardInterrupt
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> self.run()
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> self._target(*self._args, **self._kwargs)
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> racquire() File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> task = get()
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> racquire()
> return recv()
> racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt
那么实际上程序永远不会死。我最终不得不关闭终端 window 来杀死它。
更新 2
我缩小了池中 运行 函数内部的问题,导致问题的是 MySQL 数据库事务。我之前使用的是 MySQLdb
包。我将其切换为交易的 pandas.read_sql
功能,现在可以使用了。
pool.map
returns 列表中的结果。因此,不要在并发进程中调用 results.append
(这将不起作用,因为每个进程都有自己独立的 results
副本),而是将 results
分配给 [=14= 返回的值] 主进程中:
import multiprocessing as mp
def do_stuff(text):
return text
if __name__ == '__main__':
p = mp.Pool(4)
tasks = ['str{}'.format(i) for i in range(2000)]
results = p.map(do_stuff, tasks)
p.close()
print(results)
产量
['str0', 'str1', 'str2', 'str3', ...]
调试使用多处理的脚本的一种方法是添加日志语句。 multiprocessing
模块为此提供了一个辅助函数 mp.log_to_stderr
。例如,
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.DEBUG)
def do_stuff(text):
logger.info('Received {}'.format(text))
return text
if __name__ == '__main__':
p = mp.Pool(4)
tasks = ['str{}'.format(i) for i in range(2000)]
results = p.map(do_stuff, tasks)
p.close()
logger.info(results)
产生的日志输出如下:
[DEBUG/MainProcess] created semlock with handle 139824443588608
[DEBUG/MainProcess] created semlock with handle 139824443584512
[DEBUG/MainProcess] created semlock with handle 139824443580416
[DEBUG/MainProcess] created semlock with handle 139824443576320
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-1] Received str0
[INFO/PoolWorker-2] Received str125
[INFO/PoolWorker-3] Received str250
[INFO/PoolWorker-4] Received str375
[INFO/PoolWorker-3] Received str251
...
[INFO/PoolWorker-4] Received str1997
[INFO/PoolWorker-4] Received str1998
[INFO/PoolWorker-4] Received str1999
[DEBUG/MainProcess] closing pool
[INFO/MainProcess] ['str0', 'str1', 'str2', 'str3', ...]
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] task handler exiting
[DEBUG/PoolWorker-3] worker exiting after 2 tasks
[INFO/PoolWorker-3] process shutting down
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] terminating workers
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] cleaning up worker 4811
[DEBUG/MainProcess] running the remaining "atexit" finalizers
请注意,每一行都表示哪个进程发出了日志记录。因此,输出在某种程度上序列化了并发进程中的事件顺序。
通过明智地放置 logging.info
调用,您应该能够缩小脚本的位置和原因 "dying silently"(或者,至少它不会在死时如此安静).
我试图并行放置一个for循环来加速一些代码。考虑一下:
from multiprocessing import Pool
results = []
def do_stuff(str):
print str
results.append(str)
p = Pool(4)
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000
p.close()
print results
我从 do_stuff
中显示了一些调试消息,以跟踪程序在死前进行了多长时间。它似乎每次都在不同的时间点死亡。例如它会打印 'str297' 然后它会停止 运行,我会看到所有的 CPU 都停止工作,程序就在那里。应该是发生了一些错误,但没有显示错误消息。有谁知道如何调试这个问题?
更新
我试着重新编写了一些代码。我没有使用 map
函数,而是尝试了 apply_async
函数,如下所示:
pool = Pool(5)
results = pool.map(do_sym, underlyings[0::10])
results = []
for sym in underlyings[0::10]:
r = pool.apply_async(do_sym, [sym])
results.append(r)
pool.close()
pool.join()
for result in results:
print result.get(timeout=1000)
这与 map
函数一样有效,但最终以相同的方式挂起。它永远不会到达打印结果的 for 循环。
在进一步处理此问题并尝试像 unutbu 的回答中建议的那样进行一些调试日志记录之后,我将在此处提供更多信息。问题很奇怪。似乎池只是挂在那里,无法关闭并继续该程序。我使用 PyDev 环境来测试我的程序,但我想我会在控制台中尝试 运行 python。在控制台中,我得到了相同的行为,但是当我按下 control+C 终止程序时,我得到了一些可能解释问题所在的输出:
> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46:
> Process PoolWorker-44:
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> Traceback (most recent call last): Traceback (most recent call last):
> Traceback (most recent call last): File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> racquire()
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> KeyboardInterrupt
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> self.run()
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> self._target(*self._args, **self._kwargs)
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> racquire() File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> task = get()
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> racquire()
> return recv()
> racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt
那么实际上程序永远不会死。我最终不得不关闭终端 window 来杀死它。
更新 2
我缩小了池中 运行 函数内部的问题,导致问题的是 MySQL 数据库事务。我之前使用的是 MySQLdb
包。我将其切换为交易的 pandas.read_sql
功能,现在可以使用了。
pool.map
returns 列表中的结果。因此,不要在并发进程中调用 results.append
(这将不起作用,因为每个进程都有自己独立的 results
副本),而是将 results
分配给 [=14= 返回的值] 主进程中:
import multiprocessing as mp
def do_stuff(text):
return text
if __name__ == '__main__':
p = mp.Pool(4)
tasks = ['str{}'.format(i) for i in range(2000)]
results = p.map(do_stuff, tasks)
p.close()
print(results)
产量
['str0', 'str1', 'str2', 'str3', ...]
调试使用多处理的脚本的一种方法是添加日志语句。 multiprocessing
模块为此提供了一个辅助函数 mp.log_to_stderr
。例如,
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.DEBUG)
def do_stuff(text):
logger.info('Received {}'.format(text))
return text
if __name__ == '__main__':
p = mp.Pool(4)
tasks = ['str{}'.format(i) for i in range(2000)]
results = p.map(do_stuff, tasks)
p.close()
logger.info(results)
产生的日志输出如下:
[DEBUG/MainProcess] created semlock with handle 139824443588608
[DEBUG/MainProcess] created semlock with handle 139824443584512
[DEBUG/MainProcess] created semlock with handle 139824443580416
[DEBUG/MainProcess] created semlock with handle 139824443576320
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-1] Received str0
[INFO/PoolWorker-2] Received str125
[INFO/PoolWorker-3] Received str250
[INFO/PoolWorker-4] Received str375
[INFO/PoolWorker-3] Received str251
...
[INFO/PoolWorker-4] Received str1997
[INFO/PoolWorker-4] Received str1998
[INFO/PoolWorker-4] Received str1999
[DEBUG/MainProcess] closing pool
[INFO/MainProcess] ['str0', 'str1', 'str2', 'str3', ...]
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] task handler exiting
[DEBUG/PoolWorker-3] worker exiting after 2 tasks
[INFO/PoolWorker-3] process shutting down
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] terminating workers
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] cleaning up worker 4811
[DEBUG/MainProcess] running the remaining "atexit" finalizers
请注意,每一行都表示哪个进程发出了日志记录。因此,输出在某种程度上序列化了并发进程中的事件顺序。
通过明智地放置 logging.info
调用,您应该能够缩小脚本的位置和原因 "dying silently"(或者,至少它不会在死时如此安静).