Python 多处理和文件查找
Python multiprocessing and file seeks
我正在尝试使用 multiprocessing
包同时读取文件并在一些数据转换后覆盖(部分)文件。我知道这似乎有点抽象,但我可以利用这种并发性来加速我自己的 blocksync
fork.
您可以在下面找到我的代码片段:
#!/usr/bin/python2
import multiprocessing
import sys
import time
blocksize=1024
def do_open(f, mode):
f = open(f, mode)
f.seek(0, 2)
size = f.tell()
f.seek(0)
return f, size
def pipe_getblocks(f, pipe, side):
print "Child file object ID: "+str(id(f))
while True:
print "getblocks_seek_prev: "+str(f.tell())
block = f.read(blocksize)
if not block:
break
print "getblocks_seek_next: "+str(f.tell())
pipe.send(block)
def pipe_server(dev):
f, size = do_open(dev, 'r+')
parent,child = multiprocessing.Pipe(False)
reader = multiprocessing.Process(target=pipe_getblocks, args=(f,child,"R"))
reader.daemon = True
reader.start()
child.close()
i = 0
print "Parent file object ID:"+str(id(f))
while True:
try:
block = parent.recv()
except:
break
else:
print str(i)+":pseek: "+str(f.tell()/1024/1024)
f.seek(0,0) # This seek should not be see in the child subprocess...
i = i+1
pipe_server("/root/random.img")
基本上,parent 进程应该等待 child 填充管道,然后从中读取。请注意 f.seek(0,0)
行:我把它放在这里是为了验证 parent 和 child 各自对在文件中查找的位置有自己的想法。换句话说,作为两个完全不同的进程,我希望在 parent 上完成的 f.seek
对其 child.
没有影响
然而,这个假设似乎是错误的,因为上面的程序产生了以下输出:
Child file object ID: 140374094691616
getblocks_seek_prev: 0
getblocks_seek_next: 1024
...
getblocks_seek_next: 15360
getblocks_seek_prev: 15360
getblocks_seek_next: 16384
getblocks_seek_prev: 16384
getblocks_seek_next: 17408 <-- past EOF!
getblocks_seek_prev: 17408 <-- past EOF!
getblocks_seek_next: 18432 <-- past EOF!
getblocks_seek_prev: 18432 <-- past EOF!
...
Parent file object ID:140374094691616
0:pseek: 0
1:pseek: 0
2:pseek: 0
3:pseek: 0
4:pseek: 0
5:pseek: 0
6:pseek: 0
7:pseek: 0
8:pseek: 0
9:pseek: 0
10:pseek: 0
...
如您所见,child 进程 读过了它的 EOF 或者,嗯,它 认为 是这样,因为它实际上是从文件的开头读取的。简而言之,似乎 parent 的 f.seek(0,0)
对 child 过程有影响,但它没有意识到这一点。
我的假设是文件 object 存储在共享内存中,因此两个进程都在修改相同的文件 data/object。从 parent 和 child 过程中提取的 id(f)
似乎证实了这个想法,它们报告相同的数据。但是,我没有发现任何参考资料说明在使用 multiprocessing
包时文件 object 保存在共享内存中。
所以,我的问题是:这是预期的行为,还是我遗漏了一些明显的东西?
我想你想为每个子进程单独打开。不要将文件对象作为参数传递,而是尝试将路径传递给文件,然后在多处理调用的函数内部打开。
Python 正在使用 fork()
启动子进程,这会导致子进程从其父进程继承文件描述符。由于它们共享一个文件描述符,因此它们也共享相同的查找偏移量。来自 fork(2)
的联机帮助页:
The child inherits copies of the parent's set of open file
descriptors. Each file descriptor in the child refers to the same
open file description (see open(2)) as the corresponding file
descriptor in the parent. This means that the two file
descriptors share open file status flags, file offset, and signal-
driven I/O attributes (see the description of F_SETOWN and
F_SETSIG in fcntl(2)).
Python file
unix 上的对象是文件描述符的非常薄的包装器(Python 中的实现目前归结为一个 fdno 和一些关于路径的元数据;the seek()
method is just calling lseek(2)
), 所以将对象克隆到子进程基本上只是发送一个文件描述符。
我能想到的最简单的解决方案是将路径传递给子进程,并在每个进程中分别打开文件。你可能会用 os.dup
做一些棘手的事情,但我不确定除了在你生成新进程时节省一些字节之外还有什么好处。
我正在尝试使用 multiprocessing
包同时读取文件并在一些数据转换后覆盖(部分)文件。我知道这似乎有点抽象,但我可以利用这种并发性来加速我自己的 blocksync
fork.
您可以在下面找到我的代码片段:
#!/usr/bin/python2
import multiprocessing
import sys
import time
blocksize=1024
def do_open(f, mode):
f = open(f, mode)
f.seek(0, 2)
size = f.tell()
f.seek(0)
return f, size
def pipe_getblocks(f, pipe, side):
print "Child file object ID: "+str(id(f))
while True:
print "getblocks_seek_prev: "+str(f.tell())
block = f.read(blocksize)
if not block:
break
print "getblocks_seek_next: "+str(f.tell())
pipe.send(block)
def pipe_server(dev):
f, size = do_open(dev, 'r+')
parent,child = multiprocessing.Pipe(False)
reader = multiprocessing.Process(target=pipe_getblocks, args=(f,child,"R"))
reader.daemon = True
reader.start()
child.close()
i = 0
print "Parent file object ID:"+str(id(f))
while True:
try:
block = parent.recv()
except:
break
else:
print str(i)+":pseek: "+str(f.tell()/1024/1024)
f.seek(0,0) # This seek should not be see in the child subprocess...
i = i+1
pipe_server("/root/random.img")
基本上,parent 进程应该等待 child 填充管道,然后从中读取。请注意 f.seek(0,0)
行:我把它放在这里是为了验证 parent 和 child 各自对在文件中查找的位置有自己的想法。换句话说,作为两个完全不同的进程,我希望在 parent 上完成的 f.seek
对其 child.
然而,这个假设似乎是错误的,因为上面的程序产生了以下输出:
Child file object ID: 140374094691616
getblocks_seek_prev: 0
getblocks_seek_next: 1024
...
getblocks_seek_next: 15360
getblocks_seek_prev: 15360
getblocks_seek_next: 16384
getblocks_seek_prev: 16384
getblocks_seek_next: 17408 <-- past EOF!
getblocks_seek_prev: 17408 <-- past EOF!
getblocks_seek_next: 18432 <-- past EOF!
getblocks_seek_prev: 18432 <-- past EOF!
...
Parent file object ID:140374094691616
0:pseek: 0
1:pseek: 0
2:pseek: 0
3:pseek: 0
4:pseek: 0
5:pseek: 0
6:pseek: 0
7:pseek: 0
8:pseek: 0
9:pseek: 0
10:pseek: 0
...
如您所见,child 进程 读过了它的 EOF 或者,嗯,它 认为 是这样,因为它实际上是从文件的开头读取的。简而言之,似乎 parent 的 f.seek(0,0)
对 child 过程有影响,但它没有意识到这一点。
我的假设是文件 object 存储在共享内存中,因此两个进程都在修改相同的文件 data/object。从 parent 和 child 过程中提取的 id(f)
似乎证实了这个想法,它们报告相同的数据。但是,我没有发现任何参考资料说明在使用 multiprocessing
包时文件 object 保存在共享内存中。
所以,我的问题是:这是预期的行为,还是我遗漏了一些明显的东西?
我想你想为每个子进程单独打开。不要将文件对象作为参数传递,而是尝试将路径传递给文件,然后在多处理调用的函数内部打开。
Python 正在使用 fork()
启动子进程,这会导致子进程从其父进程继承文件描述符。由于它们共享一个文件描述符,因此它们也共享相同的查找偏移量。来自 fork(2)
的联机帮助页:
The child inherits copies of the parent's set of open file descriptors. Each file descriptor in the child refers to the same open file description (see open(2)) as the corresponding file descriptor in the parent. This means that the two file descriptors share open file status flags, file offset, and signal- driven I/O attributes (see the description of F_SETOWN and F_SETSIG in fcntl(2)).
Python file
unix 上的对象是文件描述符的非常薄的包装器(Python 中的实现目前归结为一个 fdno 和一些关于路径的元数据;the seek()
method is just calling lseek(2)
), 所以将对象克隆到子进程基本上只是发送一个文件描述符。
我能想到的最简单的解决方案是将路径传递给子进程,并在每个进程中分别打开文件。你可能会用 os.dup
做一些棘手的事情,但我不确定除了在你生成新进程时节省一些字节之外还有什么好处。