为什么不能在init方法中添加self.fh形式的文件处理程序?

Why can't add file handler with the form of self.fh in the init method?

os 和 python 信息:

uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2

这里有一个简单的class可以启动多处理。

from multiprocessing.pool import Pool    

class my_mp(object):
    def __init__(self):
        self.process_num = 3
        fh = open('test.txt', 'w')
    def run_task(self,i):
        print('process {} start'.format(str(i)))
        time.sleep(2)
        print('process {} end'.format(str(i)))
    def run(self):
        pool = Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()

初始化 my_mp class,然后启动多进程。

ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end

现在 my_mp class 中的 fh = open('test.txt', 'w') 替换为 self.fh = open('test.txt', 'w') 并重试。

ins = my_mp()
ins.run()    

没有输出!为什么没有进程启动?

>>> from multiprocessing.pool import Pool    
>>> 
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         self.fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...         self.fh = fh
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> 

为什么在__init__方法中不能添加self.fh形式的文件处理程序?我从来没有在任何进程中调用过__init__中定义的文件处理程序。

我做了一些调查,但没有完全回答问题。我将post这里的结果,以防他们帮助别人。

首先,如果子流程失败,没有回溯。所以我添加了额外的行来显示子进程的输出。如果没有错误发生,它应该是 None。新代码:

        for i in range(3):
            res = pool.apply_async(self.run_task, args=(i,))
            print(res.get())

输出

Traceback (most recent call last):
  File "C:/temp/LeetCode-solutions/multithreading.py", line 43, in <module>
    mp.run()
  File "C:/temp/LeetCode-solutions/multithreading.py", line 19, in run
    self.multiprocessing()
  File "C:/temp/LeetCode-solutions/multithreading.py", line 30, in multiprocessing
    print(res.get())
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\pool.py", line 537, in _handle_tasks
    put(task)
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object

程序似乎将文件对象作为 self.run_task 参数的一部分获取。该错误在 Whosebug 上由来已久,但 IMO 的最佳解释如下: https://discuss.python.org/t/use-multiprocessing-module-to-handle-a-large-file-in-python/6604

我没有找到为什么文件对象的属性会生成 class 文件对象的所有属性,但我希望这能揭开一些谜团。

最终测试:以下代码按预期工作

from multiprocessing.pool import Pool
import time


class MyMP(object):
    def __init__(self):
        self.process_num = 3

    def run(self):
        self.fh = open('test.txt', 'w')
        pool = Pool(processes=3)
        for i in range(3):
            res = pool.apply_async(run_task, args=(i,))
            print(res.get())
        pool.close()
        pool.join()
        self.fh.close()

def run_task(i):
    print('process {} start'.format(str(i)))
    time.sleep(2)
    print('process {} end'.format(str(i)))

问题:

Stdlib multiprocessing 使用 pickle 序列化对象。任何需要跨进程边界发送的东西都需要是可腌制的。

自定义 class 实例通常是可腌制的,只要它们的所有属性都是可腌制的 - 它通过在子进程中导入类型并取消腌制属性来工作。

问题是 open() 返回的对象不可 picklable。

>>> class A:
...     pass
... 
>>> import pickle
>>> pickle.dumps(A())
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x01A\x94\x93\x94)\x81\x94.'
>>> class A:
...     def __init__(self):
...         self.fh = open("test.txt", "w")
... 
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object

在第一种情况下,多处理池仍然有效,因为 fh 只是一个局部变量,一旦超出范围就会被删除,即当 __init__ 方法 returns.但是一旦您使用 self.fh = open(...) 将该句柄保存到实例的命名空间中,就会保留一个引用,并且需要通过进程边界发送它。

您可能认为,因为您只安排了方法 self.run_task 在池中执行,所以从 __init__ 设置的状态并不重要,但事实并非如此。还有一个参考:

>>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
 'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}

请注意,调用 ins = my_mp() 在主进程中运行 __init__ 方法,ins.run_task 是通过进程边界发送的对象。

解决方案:

有一个第三方库提供了 stdlib 多处理池的直​​接替换 - pip install pathos 并将多处理导入替换为:

from pathos.multiprocessing import Pool

pathos uses dill,一个比pickle更强大的序列化库,可以序列化open()返回的对象。您的代码应该可以再次运行而无需任何其他更改。但是,您应该注意每个工作进程都不知道其他进程将字节写入 self.fh,因此最后写入的工作进程可能会覆盖其他进程较早写入的数据。