为什么在管道组件上调用 communicate() 而不是最后一个会产生损坏的输出?

Why can calling communicate() on a pipeline component other than the last yield corrupted output?

我想计算一个大型 .fastq 文件(5900 万行)中大约 500 个模式的出现次数。这些模式都是 20 个字符长。

在 unix 中这很简单:

grep -F -o -f patterns.txt big_file.fastq | sort | uniq -c

但是,我希望避免编写临时模式文件,所以我使用 python 的子进程库创建了一个管道:

from subprocess import Popen, PIPE, STDOUT

p1 = Popen(["grep", "-F", "-o", "-f", "-", "big_file.fastq"], shell = False, stdin = PIPE, stdout = PIPE, stderr= STDOUT)
p2 = Popen(["sort"], shell = False, stdin = p1.stdout, stdout = PIPE, stderr = STDOUT)
p3 = Popen(["uniq", "-c"], shell = False, stdin = p2.stdout, stdout = PIPE, stderr = STDOUT)

然后我在此调用 communicate(),提供编码的 io.StringIO 类文件对象作为输入(我使用“-”将其传递给 grep 命令):

import io

patterns_file = io.StringIO("\n".join(patterns_list))
p3.communicate(input = patterns_file.read().encode('utf-8'))[0]

当我像这样在 uniq 上调用 communicate() 时,效果很好。

但是,在测试时我错误地在管道的第一部分调用了它:

p1.communicate(input = patterns_file.read().encode('utf-8'))[0]

这给了我完全错误的输出,包括比预期的 20 个字符短或长的匹配项。

我不明白这是为什么。在 p1 上调用 communicate() 不会只涉及管道的那一部分而忽略其余部分吗?删除 p2 和 p3 导致 p1 正确 grep。我觉得我缺少有关 Popen 工作原理的一些信息。

感谢任何帮助。

当您实例化 Popen 个对象时,它们引用的子进程会立即启动。因此,即使您只在 p1 上调用 communicate()p2p3 也是 运行.

为什么这很重要?因为 p2 仍然将其标准输入附加到 p1 正在写入其输出的 FIFO!

如果 p2 上的 sort 操作仍在读取内容,同时您要求 Python 程序直接读取相同的内容,您最终会得到 p1的输出在他们之间分配。可以预期随之而来的是欢闹:在两个程序之间拆分读取的唯一方法 不会 导致明显损坏的数据是如果 p2communicate() 都在读取块是 20 字节的倍数(但仍然足够小,操作系统不会将它们分成多个系统调用);但是,用于无缓冲读取的典型块大小是 4096 的倍数,每次读取块时都会从记录边界创建 4 个字节的偏移量。


顺便说一句——对于许多 程序,这不会产生如此严重的影响,因为 FIFO 的缓冲区相对较小;为它读取的每一行输入写入一行输出的程序最终会很快阻塞在输出上,因此将停止读取进一步的输入,直到其输出至少被部分刷新。 sort 是个例外,因为它需要读取所有输入才能知道它的第一行输出是什么!