python 多处理读取文件花费太多时间
python multiprocessing read file cost too much time
我的代码中有一个函数应该读取文件。每个文件大约 8M,但是读取速度太慢,为了改进我使用 multiprocessing.sadly,它似乎得到了 blocked.i想知道有什么方法可以帮助解决这个问题并提高阅读速度吗?
我的代码如下:
import multiprocessing as mp
import json
import os
def gainOneFile(filename):
file_from = open(filename)
json_str = file_from.read()
temp = json.loads(json_str)
print "load:",filename," len ",len(temp)
file_from.close()
return temp
def gainSortedArr(path):
arr = []
pool = mp.Pool(4)
for i in xrange(1,40):
abs_from_filename = os.path.join(path, "outputDict"+str(i))
result = pool.apply_async(gainOneFile,(abs_from_filename,))
arr.append(result.get())
pool.close()
pool.join()
arr = sorted(arr,key = lambda dic:len(dic))
return arr
和调用函数:
whole_arr = gainSortedArr("sortKeyOut/")
你有一些问题。首先,你没有并行化。你这样做:
result = pool.apply_async(gainOneFile,(abs_from_filename,))
arr.append(result.get())
一遍又一遍,分派一个任务,然后立即调用 .get()
等待它完成,然后再分派任何其他任务;你实际上从来没有同时拥有超过一名工人运行。存储所有结果而不调用 .get()
,然后稍后调用 .get()
。或者只使用 Pool.map
或相关方法,为自己省去手动个人结果管理的一些麻烦,例如(使用 imap_unordered
来最小化开销,因为你只是在排序):
# Make generator of paths to load
paths = (os.path.join(path, "outputDict"+str(i)) for i in xrange(1, 40))
# Load them all in parallel, and sort the results by length (lambda is redundant)
arr = sorted(pool.imap_unordered(gainOneFile, paths), key=len)
其次,multiprocessing
必须 pickle 和 unpickle 在主进程和 worker 之间发送的所有参数和 return 值,并且所有这些都是通过管道发送的,这会导致系统调用开销以启动。由于您的文件系统不太可能通过并行化读取获得显着速度,因此它可能是净损失,而不是收益。
您可能可以通过切换到基于线程的池来获得一些提升;将 import
更改为 import multiprocessing.dummy as mp
,您将获得根据线程实现的 Pool
版本;它们不围绕 CPython GIL 工作,但由于这段代码几乎肯定是 I/O 绑定的,所以这无关紧要,它消除了 pickling 和 unpickling 以及工人通信中涉及的 IPC。
最后,如果您在类似 UNIX 的系统上使用 Python 3.3 或更高版本,您也许可以通过 OS 将文件拉入系统来帮助您解决问题更积极地缓存。如果您可以打开文件,则在文件描述符上使用 os.posix_fadvise
(在文件对象上使用 .fileno()
)和 WILLNEED
或 SEQUENTIAL
它 可能 通过在请求之前主动预取文件数据来提高稍后读取文件时的读取性能。
我的代码中有一个函数应该读取文件。每个文件大约 8M,但是读取速度太慢,为了改进我使用 multiprocessing.sadly,它似乎得到了 blocked.i想知道有什么方法可以帮助解决这个问题并提高阅读速度吗?
我的代码如下:
import multiprocessing as mp
import json
import os
def gainOneFile(filename):
file_from = open(filename)
json_str = file_from.read()
temp = json.loads(json_str)
print "load:",filename," len ",len(temp)
file_from.close()
return temp
def gainSortedArr(path):
arr = []
pool = mp.Pool(4)
for i in xrange(1,40):
abs_from_filename = os.path.join(path, "outputDict"+str(i))
result = pool.apply_async(gainOneFile,(abs_from_filename,))
arr.append(result.get())
pool.close()
pool.join()
arr = sorted(arr,key = lambda dic:len(dic))
return arr
和调用函数:
whole_arr = gainSortedArr("sortKeyOut/")
你有一些问题。首先,你没有并行化。你这样做:
result = pool.apply_async(gainOneFile,(abs_from_filename,))
arr.append(result.get())
一遍又一遍,分派一个任务,然后立即调用 .get()
等待它完成,然后再分派任何其他任务;你实际上从来没有同时拥有超过一名工人运行。存储所有结果而不调用 .get()
,然后稍后调用 .get()
。或者只使用 Pool.map
或相关方法,为自己省去手动个人结果管理的一些麻烦,例如(使用 imap_unordered
来最小化开销,因为你只是在排序):
# Make generator of paths to load
paths = (os.path.join(path, "outputDict"+str(i)) for i in xrange(1, 40))
# Load them all in parallel, and sort the results by length (lambda is redundant)
arr = sorted(pool.imap_unordered(gainOneFile, paths), key=len)
其次,multiprocessing
必须 pickle 和 unpickle 在主进程和 worker 之间发送的所有参数和 return 值,并且所有这些都是通过管道发送的,这会导致系统调用开销以启动。由于您的文件系统不太可能通过并行化读取获得显着速度,因此它可能是净损失,而不是收益。
您可能可以通过切换到基于线程的池来获得一些提升;将 import
更改为 import multiprocessing.dummy as mp
,您将获得根据线程实现的 Pool
版本;它们不围绕 CPython GIL 工作,但由于这段代码几乎肯定是 I/O 绑定的,所以这无关紧要,它消除了 pickling 和 unpickling 以及工人通信中涉及的 IPC。
最后,如果您在类似 UNIX 的系统上使用 Python 3.3 或更高版本,您也许可以通过 OS 将文件拉入系统来帮助您解决问题更积极地缓存。如果您可以打开文件,则在文件描述符上使用 os.posix_fadvise
(在文件对象上使用 .fileno()
)和 WILLNEED
或 SEQUENTIAL
它 可能 通过在请求之前主动预取文件数据来提高稍后读取文件时的读取性能。