为什么打开文件时多处理不起作用?
Why does multiprocessing not working when opening a file?
当我试用多处理池模块时,我注意到它在我加载/打开任何类型的文件时不起作用。下面的代码按预期工作。当我取消注释第 8-9 行时,脚本会跳过 pool.apply_async
方法,并且 loopingTest
永远不会运行。
import time
from multiprocessing import Pool
class MultiClass:
def __init__(self):
file = 'test.txt'
# with open(file, 'r') as f: # This is the culprit
# self.d = f
self.n = 50000000
self.cases = ['1st time', '2nd time']
self.multiProc(self.cases)
print("It's done")
def loopingTest(self, cases):
print(f"looping start for {cases}")
n = self.n
while n > 0:
n -= 1
print(f"looping done for {cases}")
def multiProc(self, cases):
test = False
pool = Pool(processes=2)
if not test:
for i in cases:
pool.apply_async(self.loopingTest, (i,))
pool.close()
pool.join()
if __name__ == '__main__':
start = time.time()
w = MultiClass()
end = time.time()
print(f'Script finished in {end - start} seconds')
您看到此行为是因为在将文件描述符 (self.d
) 保存到您的实例时调用 apply_async
失败。调用apply_async(self.loopingTest, ...)
时,Python需要pickleself.loopingTest
发送给worker进程,同样需要pickleself
。当您将打开的文件描述符另存为 self
的 属性 时,酸洗失败,因为无法酸洗文件描述符。如果您在示例代码中使用 apply
而不是 apply_async
,您将亲眼看到这一点。你会得到这样的错误:
Traceback (most recent call last):
File "a.py", line 36, in <module>
w = MultiClass()
File "a.py", line 12, in __init__
self.multiProc(self.cases)
File "a.py", line 28, in multiProc
out.get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/usr/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
您需要更改您的代码,或者避免将文件描述符保存到 self
,仅在 worker 方法中创建它(如果那是您需要使用它的地方),或者通过 using the tools Python provides 到为您的 class 控制 pickle/unpickle 过程。根据用例,您还可以将传递给 apply_async
的方法转换为顶级函数,这样 self
根本不需要 pickle。
当我试用多处理池模块时,我注意到它在我加载/打开任何类型的文件时不起作用。下面的代码按预期工作。当我取消注释第 8-9 行时,脚本会跳过 pool.apply_async
方法,并且 loopingTest
永远不会运行。
import time
from multiprocessing import Pool
class MultiClass:
def __init__(self):
file = 'test.txt'
# with open(file, 'r') as f: # This is the culprit
# self.d = f
self.n = 50000000
self.cases = ['1st time', '2nd time']
self.multiProc(self.cases)
print("It's done")
def loopingTest(self, cases):
print(f"looping start for {cases}")
n = self.n
while n > 0:
n -= 1
print(f"looping done for {cases}")
def multiProc(self, cases):
test = False
pool = Pool(processes=2)
if not test:
for i in cases:
pool.apply_async(self.loopingTest, (i,))
pool.close()
pool.join()
if __name__ == '__main__':
start = time.time()
w = MultiClass()
end = time.time()
print(f'Script finished in {end - start} seconds')
您看到此行为是因为在将文件描述符 (self.d
) 保存到您的实例时调用 apply_async
失败。调用apply_async(self.loopingTest, ...)
时,Python需要pickleself.loopingTest
发送给worker进程,同样需要pickleself
。当您将打开的文件描述符另存为 self
的 属性 时,酸洗失败,因为无法酸洗文件描述符。如果您在示例代码中使用 apply
而不是 apply_async
,您将亲眼看到这一点。你会得到这样的错误:
Traceback (most recent call last):
File "a.py", line 36, in <module>
w = MultiClass()
File "a.py", line 12, in __init__
self.multiProc(self.cases)
File "a.py", line 28, in multiProc
out.get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/usr/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
您需要更改您的代码,或者避免将文件描述符保存到 self
,仅在 worker 方法中创建它(如果那是您需要使用它的地方),或者通过 using the tools Python provides 到为您的 class 控制 pickle/unpickle 过程。根据用例,您还可以将传递给 apply_async
的方法转换为顶级函数,这样 self
根本不需要 pickle。