使用 subprocess.Popen 将数据流式传输到命令中

streaming data into command with subprocess.Popen

我经常需要对 collection 个包含 header 的文件进行排序。因为排序取决于 header 的内容,所以这个用例比类似的问题(例如 Is there a way to ignore header lines in a UNIX sort?)更复杂。

我希望使用Python读取文件,输出第一个文件的header,然后将尾部通过管道进行排序。我试过这个作为概念证明:

#!/usr/bin/env python

import io
import subprocess
import sys

header_printed = False

sorter = subprocess.Popen(['sort'], stdin=subprocess.PIPE)

for f in sys.argv[1:]:
    fd = io.open(f,'r')
    line = fd.readline()
    if not header_printed:
        print(line)
        header_printed = True
    sorter.communicate(line)

当调用为 header-sort fileA fileB 时,文件 A 和文件 B 包含类似

的行
c   float   int
Y   0.557946     413
F   0.501935     852
F   0.768102     709

我得到:

# sort file 1
Traceback (most recent call last):
  File "./archive/bin/pipetest", line 17, in <module>
    sorter.communicate(line)
  File "/usr/lib/python2.7/subprocess.py", line 785, in communicate
    self.stdin.write(input)
ValueError: I/O operation on closed file

问题是通信需要一个字符串,写入后管道关闭。这意味着必须将内容完全读入内存。通信不需要发电机(我试过)。

一个更简单的演示是:

>>> import subprocess
>>> p = subprocess.Popen(['tr', 'a-z', 'A-Z'], stdin=subprocess.PIPE)
>>> p.communicate('hello')
HELLO(None, None)
>>> p.communicate('world')
Traceback (most recent call last):
  File "<ipython-input-14-d6873fd0f66a>", line 1, in <module>
    p.communicate('world')
  File "/usr/lib/python2.7/subprocess.py", line 785, in communicate
    self.stdin.write(input)
ValueError: I/O operation on closed file

所以,问题是,在 Python 中将数据流式传输到管道的正确方法是什么(使用 Popen 或其他方式)?

您可以使用 stdinstdout 中的 writing/reading,但是根据您的子流程,您需要 "flushing mechanism" 以便子流程处理您的输入。下面的代码适用于第一部分,但由于它关闭了 stdin,它也会终止子进程。如果你用 flush() 改变它,或者如果你可以添加一些尾随字符来推送你的子进程,那么你就可以使用它。否则,我建议看一下 Multithreading in Python,尤其是 pipes.

p=subprocess.Popen(['tr','a-z','A-Z'],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
p.stdin.write("hello\n")
p.stdin.close()
p.stdout.readline()
'HELLO\n'

对于您的具体情况,如果您只为单个标准句柄传递 subprocess.PIPE(在您的情况下,stdin),那么在您的示例中,您可以安全地调用 sorter.stdin.write(line)一遍又一遍。当你写完输出后,调用 sorter.stdin.close() 这样 sort 就知道输入已经完成,并且它可以执行实际的排序和输出工作(sorter.communicate() 没有参数可能也可以;否则,在关闭 stdin 之后你可能想要调用 sorter.wait() 让它完成。

如果您需要处理多个管道标准句柄,正确的方法是 threading with a dedicated thread for each pipe that must be handled beyond the first (relatively simple in concept, but heavyweight and introduces all the headaches of threading), or using the select module (or in Python 3.4+, the selectors module), which is quite tricky to get right, but can (under some circumstances) be more efficient. Lastly, there is creating temporary files for output,这样您可以在进程写入文件时直接写入进程的 stdin (因此不会阻塞);然后您可以在闲暇时阅读该文件(请注意,子进程在退出之前不一定会刷新自己的输出缓冲区,因此在进一步的输入和输出已填充并刷新之前,输出可能无法及时响应您的输入缓冲区)。

subprocess.Popen.communicate() 方法使用线程或 select 模块原语本身(取决于 OS 支持;实现在 various _communicate methods here ) 每当您为多个标准句柄传递 subprocess.PIPE 时;这就是你必须做的。

直接写入管道即可:

#!/usr/bin/env python2
import fileinput
import subprocess

process = subprocess.Popen(['sort'], stdin=subprocess.PIPE)
with process.stdin as pipe, fileinput.FileInput() as file:
    for line in file:
        if file.isfirstline(): # print header
            print line,
        else: # pipe tails
            pipe.write(line)
process.wait()