Python 多处理:全局对象未正确复制到子对象

Python multiprocessing: Global objects not being copied to children properly

几天前,我回答了 关于并行读取 tar 文件的问题。

这就是问题的要点:

import bz2
import tarfile
from multiprocessing import Pool

tr = tarfile.open('data.tar')

def clean_file(tar_file_entry):
    if '.bz2' not in str(tar_file_entry):
        return
    with tr.extractfile(tar_file_entry) as bz2_file:
        with bz2.open(bz2_file, "rt") as bzinput:
            # Reading bz2 file
            ....
            .... 


def process_serial():
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_serial() # No error
    process_parallel() # Error


if __name__ == '__main__':
    main()

.

中所述,我们只需在子进程中而不是在父进程中打开 tar 文件即可使错误消失

我无法理解为什么会这样。

即使我们在父进程中打开tar文件,子进程也会得到一份新的副本。 那么,为什么在子进程中打开 tar 文件会产生明显的影响?

这是否意味着在第一种情况下,子进程以某种方式改变了公共 tar 文件对象并由于并发写入而导致内存损坏?

FWIW,open 评论中关于文件句柄号的答案在类 UNIX 系统上实际上是不正确的。

如果 multiprocessing 使用 fork()(它在 Linux 和类似的情况下使用,尽管我读到在 macOS 上分叉有问题),文件句柄和其他所有内容都是愉快地复制到子进程(“愉快地”我的意思是它在许多边缘情况下很复杂,例如分叉线程,但它仍然适用于文件句柄)。

以下对我来说很好用:

import multiprocessing

this = open(__file__, 'r')


def read_file():
    print(len(this.read()))


def main():
    process = multiprocessing.Process(target=read_file)
    process.start()
    process.join()


if __name__ == '__main__':
    main()

问题可能是 tarfile 有一个内部结构 and/or 在读取时进行缓冲,您也可以通过尝试查找和读取相同结构的不同部分来简单地 运行 陷入冲突同时存档。也就是说,我推测在这种情况下使用没有任何同步的线程池可能 运行 会出现完全相同的问题。

编辑:澄清一下,从 Tar 存档中提取文件是 可能(我还没有检查具体细节)如下:(1)寻找封装部分(文件)的偏移量,(2)读取封装文件的块,将块写入目标文件(或管道,或w/e ), (3) 重复(2)直到提取出整个文件。

通过使用相同文件句柄的并行进程尝试以非同步方式执行此操作,可能会导致这些步骤混合,即开始处理文件 #2 将从文件 #1 开始,而我们正在读取文件 #1 等

Edit2 回答下面的评论:内存表示为子进程重新分叉,这是真的;但在内核端管理的资源(例如文件句柄和内核缓冲区)是共享的。

举例说明:

import multiprocessing

this = open(__file__, 'rb')


def read_file(worker):
    print(worker, this.read(80))


def main():
    processes = []

    for number in (1, 2):
        processes.append(
            multiprocessing.Process(target=read_file, args=(number,)))

    for process in processes:
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

运行 这个在 Linux 我得到:

$ python3.8 test.py 
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n   "
2 b''

如果查找和阅读是独立的,那么这两个过程会打印出相同的结果,但事实并非如此。由于这是一个小文件,并且 Python 选择缓冲少量数据 (8 KiB),第一个进程读取到 EOF,第二个进程没有剩余数据可读(除非它当然寻求返回)。