Python 通过对命名管道的非阻塞写入避免部分写入
Python avoid partial writes with non-blocking write to named pipe
我 运行 python3.8 linux.
在我的脚本中,我创建了一个命名管道,并按如下方式打开它:
import os
import posix
import time
file_name = 'fifo.txt'
os.mkfifo(file_name)
f = posix.open(file_name, os.O_RDWR | os.O_NONBLOCK)
os.set_blocking(f, False)
在没有打开文件以在其他地方读取(例如,使用 cat
)的情况下,我开始循环写入文件。
base_line = 'abcdefghijklmnopqrstuvwxyz'
s = base_line * 10000 + '\n'
while True:
try:
posix.write(f, s.encode())
except BlockingIOError as e:
print("Exception occurred: {}".format(e))
time.sleep(.5)
然后当我使用 cat
从命名管道读取时,我发现发生了部分写入。
我很困惑如何知道在此实例中写入了多少字节。由于抛出异常,我无权访问 return 值(写入的字节数)。文档表明 BlockingIOError
有一个名为 characters_written
的 属性,但是当我尝试访问该字段时会引发 AttributeError
。
总结:我怎样才能首先避免这种部分写入,或者至少知道在这种情况下部分写入了多少?
os.write
执行无缓冲写入。文档指出 BlockingIOError
只有一个 characters_written
属性,当缓冲写入操作会阻塞时。
如果在管道变满之前成功写入了任何字节,则将从 os.write
返回该字节数。否则,你会得到一个例外。当然,像驱动故障这样的事情也会导致异常,即使写入了一些字节。这与 POSIX write
的工作方式没有什么不同,除了在错误时不返回 -1,而是引发异常。
如果您不喜欢处理异常,可以在文件描述符周围使用包装器,例如 io.FileIO
对象。我已经修改了您的代码,因为它会在您每次循环回 os.write
调用时尝试写入整个缓冲区(如果失败一次,则每次都会失败):
import io
import os
import time
base_line = 'abcdefghijklmnopqrstuvwxyz'
data = (base_line * 10000 + '\n').encode()
file_name = 'fifo.txt'
os.mkfifo(file_name)
fd = os.open(file_name, os.O_RDWR | os.O_NONBLOCK)
# os.O_NONBLOCK makes os.set_blocking(fd, False) unnecessary.
with io.FileIO(fd, 'wb') as f:
written = 0
while written < len(data):
n = f.write(data[written:])
if n is None:
time.sleep(.5)
else:
written += n
顺便说一句,您可以使用 selectors
模块而不是 time.sleep
;我注意到由于睡眠延迟而尝试从管道读取时有轻微延迟,如果您使用 selectors
模块,则不会发生这种情况:
with io.FileIO(fd, 'wb') as f:
written = 0
sel = selectors.DefaultSelector()
sel.register(f, selectors.EVENT_WRITE)
while written < len(data):
n = f.write(data[written:])
if n is None:
# Wait here until we can start writing again.
sel.select()
else:
written += n
sel.unregister(f)
一些有用的信息也可以在 的答案中找到。
我 运行 python3.8 linux.
在我的脚本中,我创建了一个命名管道,并按如下方式打开它:
import os
import posix
import time
file_name = 'fifo.txt'
os.mkfifo(file_name)
f = posix.open(file_name, os.O_RDWR | os.O_NONBLOCK)
os.set_blocking(f, False)
在没有打开文件以在其他地方读取(例如,使用 cat
)的情况下,我开始循环写入文件。
base_line = 'abcdefghijklmnopqrstuvwxyz'
s = base_line * 10000 + '\n'
while True:
try:
posix.write(f, s.encode())
except BlockingIOError as e:
print("Exception occurred: {}".format(e))
time.sleep(.5)
然后当我使用 cat
从命名管道读取时,我发现发生了部分写入。
我很困惑如何知道在此实例中写入了多少字节。由于抛出异常,我无权访问 return 值(写入的字节数)。文档表明 BlockingIOError
有一个名为 characters_written
的 属性,但是当我尝试访问该字段时会引发 AttributeError
。
总结:我怎样才能首先避免这种部分写入,或者至少知道在这种情况下部分写入了多少?
os.write
执行无缓冲写入。文档指出 BlockingIOError
只有一个 characters_written
属性,当缓冲写入操作会阻塞时。
如果在管道变满之前成功写入了任何字节,则将从 os.write
返回该字节数。否则,你会得到一个例外。当然,像驱动故障这样的事情也会导致异常,即使写入了一些字节。这与 POSIX write
的工作方式没有什么不同,除了在错误时不返回 -1,而是引发异常。
如果您不喜欢处理异常,可以在文件描述符周围使用包装器,例如 io.FileIO
对象。我已经修改了您的代码,因为它会在您每次循环回 os.write
调用时尝试写入整个缓冲区(如果失败一次,则每次都会失败):
import io
import os
import time
base_line = 'abcdefghijklmnopqrstuvwxyz'
data = (base_line * 10000 + '\n').encode()
file_name = 'fifo.txt'
os.mkfifo(file_name)
fd = os.open(file_name, os.O_RDWR | os.O_NONBLOCK)
# os.O_NONBLOCK makes os.set_blocking(fd, False) unnecessary.
with io.FileIO(fd, 'wb') as f:
written = 0
while written < len(data):
n = f.write(data[written:])
if n is None:
time.sleep(.5)
else:
written += n
顺便说一句,您可以使用 selectors
模块而不是 time.sleep
;我注意到由于睡眠延迟而尝试从管道读取时有轻微延迟,如果您使用 selectors
模块,则不会发生这种情况:
with io.FileIO(fd, 'wb') as f:
written = 0
sel = selectors.DefaultSelector()
sel.register(f, selectors.EVENT_WRITE)
while written < len(data):
n = f.write(data[written:])
if n is None:
# Wait here until we can start writing again.
sel.select()
else:
written += n
sel.unregister(f)
一些有用的信息也可以在