命名管道内容在只读一行时被丢弃
Named pipe contents are discarded when read only a single line
我可能误解了什么,但命名管道应该像这样吗?
# consumer:
while True:
queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(queue, 'rb') as stream:
readers, _, _ = select([stream], [], [])
if readers:
reader = readers.pop()
contents: bytes = reader.readline().strip()
if b'quit' == contents:
break
print(contents)
# producer:
fd = os.open(pipe, os.O_WRONLY | os.O_NONBLOCK)
ps = open(fd, 'wb')
for i in range(10):
ps.write(str(i).encode())
ps.write(os.linesep.encode())
ps.close()
我可以看到所有数据都被写入管道,文件关闭后,消费者中的 select 将其拾取并开始读取...这是输出:
b'0'
管道的所有其余部分都被丢弃,就像它从未存在过一样。这是预期的行为吗?我的期望是打印:
b'0'
b'1'
...
b'9'
我想使用命名管道进行进程间通信。脚本 A 正在向独立脚本 B 发送命令,然后可能会发送另外三个命令。 B应该拿起那些命令,一个接一个地执行它们。因此上面的代码。但是,只有第一个被执行,其余的都消失了。命令不像上面的例子那样出现。
- cmd1+cmd2
- 10 秒后
- cmd3
- 5 秒后
- cmd4+cmd5+cmd6
- 20 秒后
- 退出 <- 此时脚本 B 终止
我怎样才能做到这一点?
由于某些神奇的原因,select 第一次挂起。
write(hello)
write(newline)
write(world)
flush()
print(hello) <- select hangs
... one eternity later ...
write(how)
write(newline)
write(are)
write(newline)
write(ya)
flush()
print(world) <- This should have been printed also without the new writes...
print(how) <- there were no new writes yet select didn't block as there was new data available for reading
print(are)
print(ya)
通过向 select 语句添加超时 1,我不需要虚拟写入来读取管道的其余部分。不确定这是否是 select 限制,但确实看起来有问题,特别是从第二次开始它按预期工作。
更新 #1
问题是消费者在每个 select 之间关闭并重新打开管道。
它只消耗了一行,并且在 关闭 时管道中等待读取的所有其他数据都消失了。
因此你应该用 fdopen
置换 while True
。此外,您应该在消费者中正确处理 EOF(这是第一个额外 while True
的来源)。这是固定代码:
消费者
import os, sys
import select
while True:
print("Opening pipe")
queue: int = os.open('pipe', flags=os.O_RDONLY)
with os.fdopen(queue, 'rb') as stream:
while True:
readers, _, _ = select.select([stream], [], [])
if readers:
reader = readers.pop()
contents: bytes = reader.readline()
if contents is b'':
print("EOF detected")
break
contents: bytes = contents.strip()
if b'quit' == contents:
sys.exit(0)
print(contents)
制作人
生产者缺少 flush 调用:
import os
fd = os.open('pipe', os.O_WRONLY)
ps = open(fd, 'wb')
for i in range(10):
ps.write(str(i).encode())
ps.write(os.linesep.encode())
ps.flush()
ps.close()
运行 消费者在 strace 下对分析你的代码有很大帮助,它揭示了所有系统调用,我可以注意到 closes在 select 秒之间。
旧的过时答案
行中:
reader = readers.pop()
您从管道获得的内容不仅仅是第一个字节。
但是然后你只打印出你得到的第一个字节。
看到当你在消费者代码的末尾添加另一个读取 reader:
contents2: bytes = reader.readline().strip()
print(contents2)
您将获得:
b'0'
b'1'
当然完整的代码应该测试它从管道中得到了多少。打印它然后等待更多数据出现在管道中。
这是我更新的消费者代码:
import os, sys
import select
while True:
queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(queue, 'rb') as stream:
readers, _, _ = select.select([stream], [], [])
if readers:
reader = readers.pop()
while True:
contents: bytes = reader.readline().strip()
if contents is b'':
break
if b'quit' == contents:
sys.exit(0)
print(contents)
我可能误解了什么,但命名管道应该像这样吗?
# consumer:
while True:
queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(queue, 'rb') as stream:
readers, _, _ = select([stream], [], [])
if readers:
reader = readers.pop()
contents: bytes = reader.readline().strip()
if b'quit' == contents:
break
print(contents)
# producer:
fd = os.open(pipe, os.O_WRONLY | os.O_NONBLOCK)
ps = open(fd, 'wb')
for i in range(10):
ps.write(str(i).encode())
ps.write(os.linesep.encode())
ps.close()
我可以看到所有数据都被写入管道,文件关闭后,消费者中的 select 将其拾取并开始读取...这是输出:
b'0'
管道的所有其余部分都被丢弃,就像它从未存在过一样。这是预期的行为吗?我的期望是打印:
b'0'
b'1'
...
b'9'
我想使用命名管道进行进程间通信。脚本 A 正在向独立脚本 B 发送命令,然后可能会发送另外三个命令。 B应该拿起那些命令,一个接一个地执行它们。因此上面的代码。但是,只有第一个被执行,其余的都消失了。命令不像上面的例子那样出现。
- cmd1+cmd2
- 10 秒后
- cmd3
- 5 秒后
- cmd4+cmd5+cmd6
- 20 秒后
- 退出 <- 此时脚本 B 终止
我怎样才能做到这一点?
由于某些神奇的原因,select 第一次挂起。
write(hello)
write(newline)
write(world)
flush()
print(hello) <- select hangs
... one eternity later ...
write(how)
write(newline)
write(are)
write(newline)
write(ya)
flush()
print(world) <- This should have been printed also without the new writes...
print(how) <- there were no new writes yet select didn't block as there was new data available for reading
print(are)
print(ya)
通过向 select 语句添加超时 1,我不需要虚拟写入来读取管道的其余部分。不确定这是否是 select 限制,但确实看起来有问题,特别是从第二次开始它按预期工作。
更新 #1
问题是消费者在每个 select 之间关闭并重新打开管道。 它只消耗了一行,并且在 关闭 时管道中等待读取的所有其他数据都消失了。
因此你应该用 fdopen
置换 while True
。此外,您应该在消费者中正确处理 EOF(这是第一个额外 while True
的来源)。这是固定代码:
消费者
import os, sys
import select
while True:
print("Opening pipe")
queue: int = os.open('pipe', flags=os.O_RDONLY)
with os.fdopen(queue, 'rb') as stream:
while True:
readers, _, _ = select.select([stream], [], [])
if readers:
reader = readers.pop()
contents: bytes = reader.readline()
if contents is b'':
print("EOF detected")
break
contents: bytes = contents.strip()
if b'quit' == contents:
sys.exit(0)
print(contents)
制作人
生产者缺少 flush 调用:
import os
fd = os.open('pipe', os.O_WRONLY)
ps = open(fd, 'wb')
for i in range(10):
ps.write(str(i).encode())
ps.write(os.linesep.encode())
ps.flush()
ps.close()
运行 消费者在 strace 下对分析你的代码有很大帮助,它揭示了所有系统调用,我可以注意到 closes在 select 秒之间。
旧的过时答案
行中:
reader = readers.pop()
您从管道获得的内容不仅仅是第一个字节。 但是然后你只打印出你得到的第一个字节。
看到当你在消费者代码的末尾添加另一个读取 reader:
contents2: bytes = reader.readline().strip()
print(contents2)
您将获得:
b'0'
b'1'
当然完整的代码应该测试它从管道中得到了多少。打印它然后等待更多数据出现在管道中。
这是我更新的消费者代码:
import os, sys
import select
while True:
queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(queue, 'rb') as stream:
readers, _, _ = select.select([stream], [], [])
if readers:
reader = readers.pop()
while True:
contents: bytes = reader.readline().strip()
if contents is b'':
break
if b'quit' == contents:
sys.exit(0)
print(contents)