Python 多处理:全局对象未正确复制到子对象
Python multiprocessing: Global objects not being copied to children properly
几天前,我回答了 关于并行读取 tar 文件的问题。
这就是问题的要点:
import bz2
import tarfile
from multiprocessing import Pool
tr = tarfile.open('data.tar')
def clean_file(tar_file_entry):
if '.bz2' not in str(tar_file_entry):
return
with tr.extractfile(tar_file_entry) as bz2_file:
with bz2.open(bz2_file, "rt") as bzinput:
# Reading bz2 file
....
....
def process_serial():
members = tr.getmembers()
processed_files = []
for i, member in enumerate(members):
processed_files.append(clean_file(member))
print(f'done {i}/{len(members)}')
def process_parallel():
members = tr.getmembers()
with Pool() as pool:
processed_files = pool.map(clean_file, members)
print(processed_files)
def main():
process_serial() # No error
process_parallel() # Error
if __name__ == '__main__':
main()
如 .
中所述,我们只需在子进程中而不是在父进程中打开 tar 文件即可使错误消失
我无法理解为什么会这样。
即使我们在父进程中打开tar文件,子进程也会得到一份新的副本。
那么,为什么在子进程中打开 tar 文件会产生明显的影响?
这是否意味着在第一种情况下,子进程以某种方式改变了公共 tar 文件对象并由于并发写入而导致内存损坏?
FWIW,open
评论中关于文件句柄号的答案在类 UNIX 系统上实际上是不正确的。
如果 multiprocessing
使用 fork()
(它在 Linux 和类似的情况下使用,尽管我读到在 macOS 上分叉有问题),文件句柄和其他所有内容都是愉快地复制到子进程(“愉快地”我的意思是它在许多边缘情况下很复杂,例如分叉线程,但它仍然适用于文件句柄)。
以下对我来说很好用:
import multiprocessing
this = open(__file__, 'r')
def read_file():
print(len(this.read()))
def main():
process = multiprocessing.Process(target=read_file)
process.start()
process.join()
if __name__ == '__main__':
main()
问题可能是 tarfile
有一个内部结构 and/or 在读取时进行缓冲,您也可以通过尝试查找和读取相同结构的不同部分来简单地 运行 陷入冲突同时存档。也就是说,我推测在这种情况下使用没有任何同步的线程池可能 运行 会出现完全相同的问题。
编辑:澄清一下,从 Tar 存档中提取文件是 可能(我还没有检查具体细节)如下:(1)寻找封装部分(文件)的偏移量,(2)读取封装文件的块,将块写入目标文件(或管道,或w/e ), (3) 重复(2)直到提取出整个文件。
通过使用相同文件句柄的并行进程尝试以非同步方式执行此操作,可能会导致这些步骤混合,即开始处理文件 #2 将从文件 #1 开始,而我们正在读取文件 #1 等
Edit2 回答下面的评论:内存表示为子进程重新分叉,这是真的;但在内核端管理的资源(例如文件句柄和内核缓冲区)是共享的。
举例说明:
import multiprocessing
this = open(__file__, 'rb')
def read_file(worker):
print(worker, this.read(80))
def main():
processes = []
for number in (1, 2):
processes.append(
multiprocessing.Process(target=read_file, args=(number,)))
for process in processes:
process.start()
for process in processes:
process.join()
if __name__ == '__main__':
main()
运行 这个在 Linux 我得到:
$ python3.8 test.py
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n "
2 b''
如果查找和阅读是独立的,那么这两个过程会打印出相同的结果,但事实并非如此。由于这是一个小文件,并且 Python 选择缓冲少量数据 (8 KiB),第一个进程读取到 EOF,第二个进程没有剩余数据可读(除非它当然寻求返回)。
几天前,我回答了
这就是问题的要点:
import bz2
import tarfile
from multiprocessing import Pool
tr = tarfile.open('data.tar')
def clean_file(tar_file_entry):
if '.bz2' not in str(tar_file_entry):
return
with tr.extractfile(tar_file_entry) as bz2_file:
with bz2.open(bz2_file, "rt") as bzinput:
# Reading bz2 file
....
....
def process_serial():
members = tr.getmembers()
processed_files = []
for i, member in enumerate(members):
processed_files.append(clean_file(member))
print(f'done {i}/{len(members)}')
def process_parallel():
members = tr.getmembers()
with Pool() as pool:
processed_files = pool.map(clean_file, members)
print(processed_files)
def main():
process_serial() # No error
process_parallel() # Error
if __name__ == '__main__':
main()
如
我无法理解为什么会这样。
即使我们在父进程中打开tar文件,子进程也会得到一份新的副本。 那么,为什么在子进程中打开 tar 文件会产生明显的影响?
这是否意味着在第一种情况下,子进程以某种方式改变了公共 tar 文件对象并由于并发写入而导致内存损坏?
FWIW,open
评论中关于文件句柄号的答案在类 UNIX 系统上实际上是不正确的。
如果 multiprocessing
使用 fork()
(它在 Linux 和类似的情况下使用,尽管我读到在 macOS 上分叉有问题),文件句柄和其他所有内容都是愉快地复制到子进程(“愉快地”我的意思是它在许多边缘情况下很复杂,例如分叉线程,但它仍然适用于文件句柄)。
以下对我来说很好用:
import multiprocessing
this = open(__file__, 'r')
def read_file():
print(len(this.read()))
def main():
process = multiprocessing.Process(target=read_file)
process.start()
process.join()
if __name__ == '__main__':
main()
问题可能是 tarfile
有一个内部结构 and/or 在读取时进行缓冲,您也可以通过尝试查找和读取相同结构的不同部分来简单地 运行 陷入冲突同时存档。也就是说,我推测在这种情况下使用没有任何同步的线程池可能 运行 会出现完全相同的问题。
编辑:澄清一下,从 Tar 存档中提取文件是 可能(我还没有检查具体细节)如下:(1)寻找封装部分(文件)的偏移量,(2)读取封装文件的块,将块写入目标文件(或管道,或w/e ), (3) 重复(2)直到提取出整个文件。
通过使用相同文件句柄的并行进程尝试以非同步方式执行此操作,可能会导致这些步骤混合,即开始处理文件 #2 将从文件 #1 开始,而我们正在读取文件 #1 等
Edit2 回答下面的评论:内存表示为子进程重新分叉,这是真的;但在内核端管理的资源(例如文件句柄和内核缓冲区)是共享的。
举例说明:
import multiprocessing
this = open(__file__, 'rb')
def read_file(worker):
print(worker, this.read(80))
def main():
processes = []
for number in (1, 2):
processes.append(
multiprocessing.Process(target=read_file, args=(number,)))
for process in processes:
process.start()
for process in processes:
process.join()
if __name__ == '__main__':
main()
运行 这个在 Linux 我得到:
$ python3.8 test.py
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n "
2 b''
如果查找和阅读是独立的,那么这两个过程会打印出相同的结果,但事实并非如此。由于这是一个小文件,并且 Python 选择缓冲少量数据 (8 KiB),第一个进程读取到 EOF,第二个进程没有剩余数据可读(除非它当然寻求返回)。