Python 多处理:导入函数未结束
Python multiprocessing : imported functions not ending
在尝试使用多处理库 (python 3.5) 时,我遇到了没有结束的函数。所有似乎都已处理,但(主)程序没有继续...
我目前的设置如下:
# Main.py
import multiprocessing as mp
import pandas as pd
from dosomething import dosomething
csvfolder = 'data/'
data = pd.DataFrame([
{'a':12,'b':13},
{'a':2,'b':14},
{'a':1,'b':23},
{'a':123,'b':16},
{'a':142,'b':14},
])
print('start')
result = mp.Queue()
dos = mp.Process(target=dosomething, args=(data,csvfolder,result,'dosomething'))
dos.start()
dos.join()
result.get()
print('finished')
然后在 dosomething 中我定义了一个函数 dosomething 执行以下操作:
# dosomething.py
import os
def dosomething(data,csvfolder,result,name):
data.to_csv(os.path.join(csvfolder,'test.csv'))
result.put({name:{'data':data}})
函数似乎按预期执行但从未结束导致主程序停止。结束程序时,我收到以下消息:
Process Process-1: Traceback (most recent call last): File
"/usr/lib/python3.5/multiprocessing/process.py", line 252, in
_bootstrap
util._exit_function() File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in
_exit_function
_run_finalizers() File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in
_run_finalizers
finalizer() File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in call
res = self._callback(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in
_finalize_join
thread.join() File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock() File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout): KeyboardInterrupt
根据评论,我了解到 result.put() 是(在使用实际数据时)很多时间,变得没有响应。我放在这个队列中的结果是一个包含 2 个元素的字典,其中一个是 pandas 数据框(包含 100.000 条记录)。
我该如何解决这个问题?
来自 multiprocessing guidelines
.
... whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined.
只需交换 Process.join
和 Queue.get
行即可。
在尝试使用多处理库 (python 3.5) 时,我遇到了没有结束的函数。所有似乎都已处理,但(主)程序没有继续...
我目前的设置如下:
# Main.py
import multiprocessing as mp
import pandas as pd
from dosomething import dosomething
csvfolder = 'data/'
data = pd.DataFrame([
{'a':12,'b':13},
{'a':2,'b':14},
{'a':1,'b':23},
{'a':123,'b':16},
{'a':142,'b':14},
])
print('start')
result = mp.Queue()
dos = mp.Process(target=dosomething, args=(data,csvfolder,result,'dosomething'))
dos.start()
dos.join()
result.get()
print('finished')
然后在 dosomething 中我定义了一个函数 dosomething 执行以下操作:
# dosomething.py
import os
def dosomething(data,csvfolder,result,name):
data.to_csv(os.path.join(csvfolder,'test.csv'))
result.put({name:{'data':data}})
函数似乎按预期执行但从未结束导致主程序停止。结束程序时,我收到以下消息:
Process Process-1: Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap util._exit_function() File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function _run_finalizers() File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers finalizer() File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in call res = self._callback(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join thread.join() File "/usr/lib/python3.5/threading.py", line 1054, in join self._wait_for_tstate_lock() File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt
根据评论,我了解到 result.put() 是(在使用实际数据时)很多时间,变得没有响应。我放在这个队列中的结果是一个包含 2 个元素的字典,其中一个是 pandas 数据框(包含 100.000 条记录)。
我该如何解决这个问题?
来自 multiprocessing guidelines
.
... whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined.
只需交换 Process.join
和 Queue.get
行即可。