命名管道内容在只读一行时被丢弃

Named pipe contents are discarded when read only a single line

我可能误解了什么,但命名管道应该像这样吗?

# consumer:
while True:
  queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
  with os.fdopen(queue, 'rb') as stream:
    readers, _, _ = select([stream], [], [])
    if readers:
      reader = readers.pop()
      contents: bytes = reader.readline().strip()

      if b'quit' == contents:
        break

      print(contents)

# producer:
fd = os.open(pipe, os.O_WRONLY | os.O_NONBLOCK)
ps = open(fd, 'wb')
for i in range(10):
  ps.write(str(i).encode())
  ps.write(os.linesep.encode())
ps.close()

我可以看到所有数据都被写入管道,文件关闭后,消费者中的 select 将其拾取并开始读取...这是输出:

b'0'

管道的所有其余部分都被丢弃,就像它从未存在过一样。这是预期的行为吗?我的期望是打印:

b'0'
b'1'
...
b'9'

我想使用命名管道进行进程间通信。脚本 A 正在向独立脚本 B 发送命令,然后可能会发送另外三个命令。 B应该拿起那些命令,一个接一个地执行它们。因此上面的代码。但是,只有第一个被执行,其余的都消失了。命令不像上面的例子那样出现。

我怎样才能做到这一点?

由于某些神奇的原因,select 第一次挂起。

write(hello)
write(newline)
write(world)
flush()
print(hello) <- select hangs
... one eternity later ...
write(how)
write(newline)
write(are)
write(newline)
write(ya)
flush()
print(world) <- This should have been printed also without the new writes...
print(how) <- there were no new writes yet select didn't block as there was new data available for reading
print(are)
print(ya)

通过向 select 语句添加超时 1,我不需要虚拟写入来读取管道的其余部分。不确定这是否是 select 限制,但确实看起来有问题,特别是从第二次开始它按预期工作。

更新 #1

问题是消费者在每个 select 之间关闭并重新打开管道。 它只消耗了一行,并且在 关闭 时管道中等待读取的所有其他数据都消失了。

因此你应该用 fdopen 置换 while True。此外,您应该在消费者中正确处理 EOF(这是第一个额外 while True 的来源)。这是固定代码:

消费者

import os, sys
import select

while True:

    print("Opening pipe")
    queue: int = os.open('pipe', flags=os.O_RDONLY)
    with os.fdopen(queue, 'rb') as stream:

        while True:
            readers, _, _ = select.select([stream], [], [])

            if readers:
                reader = readers.pop()

                contents: bytes = reader.readline()
                if contents is b'':
                    print("EOF detected")
                    break

                contents: bytes = contents.strip()
                if b'quit' == contents:
                    sys.exit(0)

                print(contents)

制作人

生产者缺少 flush 调用:

import os

fd = os.open('pipe', os.O_WRONLY)
ps = open(fd, 'wb')
for i in range(10):
    ps.write(str(i).encode())
    ps.write(os.linesep.encode())
    ps.flush()

ps.close()

运行 消费者在 strace 下对分析你的代码有很大帮助,它揭示了所有系统调用,我可以注意到 closes在 select 秒之间。


旧的过时答案

行中:

reader = readers.pop()

您从管道获得的内容不仅仅是第一个字节。 但是然后你只打印出你得到的第一个字节。

看到当你在消费者代码的末尾添加另一个读取 reader:

contents2: bytes = reader.readline().strip()
print(contents2)

您将获得:

b'0'
b'1'

当然完整的代码应该测试它从管道中得到了多少。打印它然后等待更多数据出现在管道中。

这是我更新的消费者代码:

import os, sys
import select

while True:
    queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
    with os.fdopen(queue, 'rb') as stream:
        readers, _, _ = select.select([stream], [], [])
        if readers:
            reader = readers.pop()
            while True:
                contents: bytes = reader.readline().strip()
                if contents is b'':
                    break
                if b'quit' == contents:
                    sys.exit(0)

                print(contents)