为什么不能在init方法中添加self.fh形式的文件处理程序?
Why can't add file handler with the form of self.fh in the init method?
os 和 python 信息:
uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2
这里有一个简单的class可以启动多处理。
from multiprocessing.pool import Pool
class my_mp(object):
def __init__(self):
self.process_num = 3
fh = open('test.txt', 'w')
def run_task(self,i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
def run(self):
pool = Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
初始化 my_mp
class,然后启动多进程。
ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end
现在 my_mp
class 中的 fh = open('test.txt', 'w')
替换为 self.fh = open('test.txt', 'w')
并重试。
ins = my_mp()
ins.run()
没有输出!为什么没有进程启动?
>>> from multiprocessing.pool import Pool
>>>
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... self.fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... self.fh = fh
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>>
为什么在__init__
方法中不能添加self.fh
形式的文件处理程序?我从来没有在任何进程中调用过__init__
中定义的文件处理程序。
我做了一些调查,但没有完全回答问题。我将post这里的结果,以防他们帮助别人。
首先,如果子流程失败,没有回溯。所以我添加了额外的行来显示子进程的输出。如果没有错误发生,它应该是 None
。新代码:
for i in range(3):
res = pool.apply_async(self.run_task, args=(i,))
print(res.get())
输出
Traceback (most recent call last):
File "C:/temp/LeetCode-solutions/multithreading.py", line 43, in <module>
mp.run()
File "C:/temp/LeetCode-solutions/multithreading.py", line 19, in run
self.multiprocessing()
File "C:/temp/LeetCode-solutions/multithreading.py", line 30, in multiprocessing
print(res.get())
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\pool.py", line 771, in get
raise self._value
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\pool.py", line 537, in _handle_tasks
put(task)
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
程序似乎将文件对象作为 self.run_task
参数的一部分获取。该错误在 Whosebug 上由来已久,但 IMO 的最佳解释如下:
https://discuss.python.org/t/use-multiprocessing-module-to-handle-a-large-file-in-python/6604
我没有找到为什么文件对象的属性会生成 class 文件对象的所有属性,但我希望这能揭开一些谜团。
最终测试:以下代码按预期工作
from multiprocessing.pool import Pool
import time
class MyMP(object):
def __init__(self):
self.process_num = 3
def run(self):
self.fh = open('test.txt', 'w')
pool = Pool(processes=3)
for i in range(3):
res = pool.apply_async(run_task, args=(i,))
print(res.get())
pool.close()
pool.join()
self.fh.close()
def run_task(i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
问题:
Stdlib multiprocessing 使用 pickle 序列化对象。任何需要跨进程边界发送的东西都需要是可腌制的。
自定义 class 实例通常是可腌制的,只要它们的所有属性都是可腌制的 - 它通过在子进程中导入类型并取消腌制属性来工作。
问题是 open()
返回的对象不可 picklable。
>>> class A:
... pass
...
>>> import pickle
>>> pickle.dumps(A())
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x01A\x94\x93\x94)\x81\x94.'
>>> class A:
... def __init__(self):
... self.fh = open("test.txt", "w")
...
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object
在第一种情况下,多处理池仍然有效,因为 fh
只是一个局部变量,一旦超出范围就会被删除,即当 __init__
方法 returns.但是一旦您使用 self.fh = open(...)
将该句柄保存到实例的命名空间中,就会保留一个引用,并且需要通过进程边界发送它。
您可能认为,因为您只安排了方法 self.run_task
在池中执行,所以从 __init__
设置的状态并不重要,但事实并非如此。还有一个参考:
>>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}
请注意,调用 ins = my_mp()
在主进程中运行 __init__
方法,ins.run_task
是通过进程边界发送的对象。
解决方案:
有一个第三方库提供了 stdlib 多处理池的直接替换 - pip install pathos
并将多处理导入替换为:
from pathos.multiprocessing import Pool
pathos uses dill,一个比pickle更强大的序列化库,可以序列化open()
返回的对象。您的代码应该可以再次运行而无需任何其他更改。但是,您应该注意每个工作进程都不知道其他进程将字节写入 self.fh
,因此最后写入的工作进程可能会覆盖其他进程较早写入的数据。
os 和 python 信息:
uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2
这里有一个简单的class可以启动多处理。
from multiprocessing.pool import Pool
class my_mp(object):
def __init__(self):
self.process_num = 3
fh = open('test.txt', 'w')
def run_task(self,i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
def run(self):
pool = Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
初始化 my_mp
class,然后启动多进程。
ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end
现在 my_mp
class 中的 fh = open('test.txt', 'w')
替换为 self.fh = open('test.txt', 'w')
并重试。
ins = my_mp()
ins.run()
没有输出!为什么没有进程启动?
>>> from multiprocessing.pool import Pool
>>>
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... self.fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... self.fh = fh
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>>
为什么在__init__
方法中不能添加self.fh
形式的文件处理程序?我从来没有在任何进程中调用过__init__
中定义的文件处理程序。
我做了一些调查,但没有完全回答问题。我将post这里的结果,以防他们帮助别人。
首先,如果子流程失败,没有回溯。所以我添加了额外的行来显示子进程的输出。如果没有错误发生,它应该是 None
。新代码:
for i in range(3):
res = pool.apply_async(self.run_task, args=(i,))
print(res.get())
输出
Traceback (most recent call last):
File "C:/temp/LeetCode-solutions/multithreading.py", line 43, in <module>
mp.run()
File "C:/temp/LeetCode-solutions/multithreading.py", line 19, in run
self.multiprocessing()
File "C:/temp/LeetCode-solutions/multithreading.py", line 30, in multiprocessing
print(res.get())
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\pool.py", line 771, in get
raise self._value
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\pool.py", line 537, in _handle_tasks
put(task)
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
程序似乎将文件对象作为 self.run_task
参数的一部分获取。该错误在 Whosebug 上由来已久,但 IMO 的最佳解释如下:
https://discuss.python.org/t/use-multiprocessing-module-to-handle-a-large-file-in-python/6604
我没有找到为什么文件对象的属性会生成 class 文件对象的所有属性,但我希望这能揭开一些谜团。
最终测试:以下代码按预期工作
from multiprocessing.pool import Pool
import time
class MyMP(object):
def __init__(self):
self.process_num = 3
def run(self):
self.fh = open('test.txt', 'w')
pool = Pool(processes=3)
for i in range(3):
res = pool.apply_async(run_task, args=(i,))
print(res.get())
pool.close()
pool.join()
self.fh.close()
def run_task(i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
问题:
Stdlib multiprocessing 使用 pickle 序列化对象。任何需要跨进程边界发送的东西都需要是可腌制的。
自定义 class 实例通常是可腌制的,只要它们的所有属性都是可腌制的 - 它通过在子进程中导入类型并取消腌制属性来工作。
问题是 open()
返回的对象不可 picklable。
>>> class A:
... pass
...
>>> import pickle
>>> pickle.dumps(A())
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x01A\x94\x93\x94)\x81\x94.'
>>> class A:
... def __init__(self):
... self.fh = open("test.txt", "w")
...
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object
在第一种情况下,多处理池仍然有效,因为 fh
只是一个局部变量,一旦超出范围就会被删除,即当 __init__
方法 returns.但是一旦您使用 self.fh = open(...)
将该句柄保存到实例的命名空间中,就会保留一个引用,并且需要通过进程边界发送它。
您可能认为,因为您只安排了方法 self.run_task
在池中执行,所以从 __init__
设置的状态并不重要,但事实并非如此。还有一个参考:
>>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}
请注意,调用 ins = my_mp()
在主进程中运行 __init__
方法,ins.run_task
是通过进程边界发送的对象。
解决方案:
有一个第三方库提供了 stdlib 多处理池的直接替换 - pip install pathos
并将多处理导入替换为:
from pathos.multiprocessing import Pool
pathos uses dill,一个比pickle更强大的序列化库,可以序列化open()
返回的对象。您的代码应该可以再次运行而无需任何其他更改。但是,您应该注意每个工作进程都不知道其他进程将字节写入 self.fh
,因此最后写入的工作进程可能会覆盖其他进程较早写入的数据。