Python 在保留顺序的同时分别从子进程 stdout 和 stderr 读取
Python read from subprocess stdout and stderr separately while preserving order
我有一个 python 子进程,我正在尝试从中读取输出和错误流。目前我可以使用它,但我只能在从 stdout
读取完后才能从 stderr
读取。这是它的样子:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_iterator = iter(process.stdout.readline, b"")
stderr_iterator = iter(process.stderr.readline, b"")
for line in stdout_iterator:
# Do stuff with line
print line
for line in stderr_iterator:
# Do stuff with line
print line
如您所见,stderr
for 循环只有在 stdout
循环完成后才能开始。我怎样才能修改它以便能够以正确的行进入顺序从两者中读取?
澄清一下: 我仍然需要能够分辨出一行是来自 stdout
还是 stderr
,因为它们在我的文档中会被区别对待代码。
如果子进程在 stderr 上产生足够的输出(在我的 Linux 机器上大约 100KB),您问题中的代码可能会死锁。
有一个 communicate()
方法允许分别从 stdout 和 stderr 读取:
from subprocess import Popen, PIPE
process = Popen(command, stdout=PIPE, stderr=PIPE)
output, err = process.communicate()
如果您需要在子进程仍在运行时读取流 运行,那么可移植的解决方案是使用线程(未测试):
from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue # Python 2
def reader(pipe, queue):
try:
with pipe:
for line in iter(pipe.readline, b''):
queue.put((pipe, line))
finally:
queue.put(None)
process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
for _ in range(2):
for source, line in iter(q.get, None):
print "%s: %s" % (source, line),
参见:
- Python: read streaming input from subprocess.communicate()
- Non-blocking read on a subprocess.PIPE in python
- Python subprocess get children's output to file and terminal?
进程将数据写入不同管道的顺序在写入后丢失。
无法判断 stdout 是否在 stderr 之前写入。
您可以尝试以非阻塞方式同时从多个文件描述符读取数据
一旦数据可用,但这只会最大限度地减少订单不正确的可能性。
这个程序应该证明这一点:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import select
import subprocess
testapps={
'slow': '''
import os
import time
os.write(1, 'aaa')
time.sleep(0.01)
os.write(2, 'bbb')
time.sleep(0.01)
os.write(1, 'ccc')
''',
'fast': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbb')
os.write(1, 'ccc')
''',
'fast2': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbbbbbbbbbbbbbb')
os.write(1, 'ccc')
'''
}
def readfds(fds, maxread):
while True:
fdsin, _, _ = select.select(fds,[],[])
for fd in fdsin:
s = os.read(fd, maxread)
if len(s) == 0:
fds.remove(fd)
continue
yield fd, s
if fds == []:
break
def readfromapp(app, rounds=10, maxread=1024):
f=open('testapp.py', 'w')
f.write(testapps[app])
f.close()
results={}
for i in range(0, rounds):
p = subprocess.Popen(['python', 'testapp.py'], stdout=subprocess.PIPE
, stderr=subprocess.PIPE)
data=''
for (fd, s) in readfds([p.stdout.fileno(), p.stderr.fileno()], maxread):
data = data + s
results[data] = results[data] + 1 if data in results else 1
print 'running %i rounds %s with maxread=%i' % (rounds, app, maxread)
results = sorted(results.items(), key=lambda (k,v): k, reverse=False)
for data, count in results:
print '%03i x %s' % (count, data)
print
print "=> if output is produced slowly this should work as whished"
print " and should return: aaabbbccc"
readfromapp('slow', rounds=100, maxread=1024)
print
print "=> now mostly aaacccbbb is returnd, not as it should be"
readfromapp('fast', rounds=100, maxread=1024)
print
print "=> you could try to read data one by one, and return"
print " e.g. a whole line only when LF is read"
print " (b's should be finished before c's)"
readfromapp('fast', rounds=100, maxread=1)
print
print "=> but even this won't work ..."
readfromapp('fast2', rounds=100, maxread=1)
并输出如下内容:
=> if output is produced slowly this should work as whished
and should return: aaabbbccc
running 100 rounds slow with maxread=1024
100 x aaabbbccc
=> now mostly aaacccbbb is returnd, not as it should be
running 100 rounds fast with maxread=1024
006 x aaabbbccc
094 x aaacccbbb
=> you could try to read data one by one, and return
e.g. a whole line only when LF is read
(b's should be finished before c's)
running 100 rounds fast with maxread=1
003 x aaabbbccc
003 x aababcbcc
094 x abababccc
=> but even this won't work ...
running 100 rounds fast2 with maxread=1
003 x aaabbbbbbbbbbbbbbbccc
001 x aaacbcbcbbbbbbbbbbbbb
008 x aababcbcbcbbbbbbbbbbb
088 x abababcbcbcbbbbbbbbbb
我知道这个问题很老,但这个答案可能会帮助偶然发现此页面的其他人研究类似情况的解决方案,所以我还是要发布它。
我构建了一个简单的 python 片段,可以将任意数量的管道合并为一个。当然,如上所述,不能保证顺序,但这是我认为你能得到的最接近的Python。
它为每个管道生成一个线程,逐行读取它们并将它们放入队列(即 FIFO)中。主线程循环遍历队列,生成每一行。
import threading, queue
def merge_pipes(**named_pipes):
r'''
Merges multiple pipes from subprocess.Popen (maybe other sources as well).
The keyword argument keys will be used in the output to identify the source
of the line.
Example:
p = subprocess.Popen(['some', 'call'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outputs = {'out': log.info, 'err': log.warn}
for name, line in merge_pipes(out=p.stdout, err=p.stderr):
outputs[name](line)
This will output stdout to the info logger, and stderr to the warning logger
'''
# Constants. Could also be placed outside of the method. I just put them here
# so the method is fully self-contained
PIPE_OPENED=1
PIPE_OUTPUT=2
PIPE_CLOSED=3
# Create a queue where the pipes will be read into
output = queue.Queue()
# This method is the run body for the threads that are instatiated below
# This could be easily rewritten to be outside of the merge_pipes method,
# but to make it fully self-contained I put it here
def pipe_reader(name, pipe):
r"""
reads a single pipe into the queue
"""
output.put( ( PIPE_OPENED, name, ) )
try:
for line in iter(pipe.readline,''):
output.put( ( PIPE_OUTPUT, name, line.rstrip(), ) )
finally:
output.put( ( PIPE_CLOSED, name, ) )
# Start a reader for each pipe
for name, pipe in named_pipes.items():
t=threading.Thread(target=pipe_reader, args=(name, pipe, ))
t.daemon = True
t.start()
# Use a counter to determine how many pipes are left open.
# If all are closed, we can return
pipe_count = 0
# Read the queue in order, blocking if there's no data
for data in iter(output.get,''):
code=data[0]
if code == PIPE_OPENED:
pipe_count += 1
elif code == PIPE_CLOSED:
pipe_count -= 1
elif code == PIPE_OUTPUT:
yield data[1:]
if pipe_count == 0:
return
这对我有用(windows):
https://github.com/waszil/subpiper
from subpiper import subpiper
def my_stdout_callback(line: str):
print(f'STDOUT: {line}')
def my_stderr_callback(line: str):
print(f'STDERR: {line}')
my_additional_path_list = [r'c:\important_location']
retcode = subpiper(cmd='echo magic',
stdout_callback=my_stdout_callback,
stderr_callback=my_stderr_callback,
add_path_list=my_additional_path_list)
这是一个基于 selectors
的解决方案,但它保留了顺序,并流式传输 variable-length 个字符(甚至是单个字符)。
诀窍是使用 read1()
,而不是 read()
。
import selectors
import subprocess
import sys
p = subprocess.Popen(
["python", "random_out.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)
while True:
for key, _ in sel.select():
data = key.fileobj.read1().decode()
if not data:
exit()
if key.fileobj is p.stdout:
print(data, end="")
else:
print(data, end="", file=sys.stderr)
如果你想要一个测试程序,使用这个。
import sys
from time import sleep
for i in range(10):
print(f" x{i} ", file=sys.stderr, end="")
sleep(0.1)
print(f" y{i} ", end="")
sleep(0.1)
这适用于 Python3 (3.6):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
# Read both stdout and stderr simultaneously
sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)
ok = True
while ok:
for key, val1 in sel.select():
line = key.fileobj.readline()
if not line:
ok = False
break
if key.fileobj is p.stdout:
print(f"STDOUT: {line}", end="")
else:
print(f"STDERR: {line}", end="", file=sys.stderr)
来自 https://docs.python.org/3/library/subprocess.html#using-the-subprocess-module
If you wish to capture and combine both streams into one, use
stdout=PIPE and stderr=STDOUT instead of capture_output.
所以最简单的解决方案是:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout_iterator = iter(process.stdout.readline, b"")
for line in stdout_iterator:
# Do stuff with line
print line
我有一个 python 子进程,我正在尝试从中读取输出和错误流。目前我可以使用它,但我只能在从 stdout
读取完后才能从 stderr
读取。这是它的样子:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_iterator = iter(process.stdout.readline, b"")
stderr_iterator = iter(process.stderr.readline, b"")
for line in stdout_iterator:
# Do stuff with line
print line
for line in stderr_iterator:
# Do stuff with line
print line
如您所见,stderr
for 循环只有在 stdout
循环完成后才能开始。我怎样才能修改它以便能够以正确的行进入顺序从两者中读取?
澄清一下: 我仍然需要能够分辨出一行是来自 stdout
还是 stderr
,因为它们在我的文档中会被区别对待代码。
如果子进程在 stderr 上产生足够的输出(在我的 Linux 机器上大约 100KB),您问题中的代码可能会死锁。
有一个 communicate()
方法允许分别从 stdout 和 stderr 读取:
from subprocess import Popen, PIPE
process = Popen(command, stdout=PIPE, stderr=PIPE)
output, err = process.communicate()
如果您需要在子进程仍在运行时读取流 运行,那么可移植的解决方案是使用线程(未测试):
from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue # Python 2
def reader(pipe, queue):
try:
with pipe:
for line in iter(pipe.readline, b''):
queue.put((pipe, line))
finally:
queue.put(None)
process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
for _ in range(2):
for source, line in iter(q.get, None):
print "%s: %s" % (source, line),
参见:
- Python: read streaming input from subprocess.communicate()
- Non-blocking read on a subprocess.PIPE in python
- Python subprocess get children's output to file and terminal?
进程将数据写入不同管道的顺序在写入后丢失。
无法判断 stdout 是否在 stderr 之前写入。
您可以尝试以非阻塞方式同时从多个文件描述符读取数据 一旦数据可用,但这只会最大限度地减少订单不正确的可能性。
这个程序应该证明这一点:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import select
import subprocess
testapps={
'slow': '''
import os
import time
os.write(1, 'aaa')
time.sleep(0.01)
os.write(2, 'bbb')
time.sleep(0.01)
os.write(1, 'ccc')
''',
'fast': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbb')
os.write(1, 'ccc')
''',
'fast2': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbbbbbbbbbbbbbb')
os.write(1, 'ccc')
'''
}
def readfds(fds, maxread):
while True:
fdsin, _, _ = select.select(fds,[],[])
for fd in fdsin:
s = os.read(fd, maxread)
if len(s) == 0:
fds.remove(fd)
continue
yield fd, s
if fds == []:
break
def readfromapp(app, rounds=10, maxread=1024):
f=open('testapp.py', 'w')
f.write(testapps[app])
f.close()
results={}
for i in range(0, rounds):
p = subprocess.Popen(['python', 'testapp.py'], stdout=subprocess.PIPE
, stderr=subprocess.PIPE)
data=''
for (fd, s) in readfds([p.stdout.fileno(), p.stderr.fileno()], maxread):
data = data + s
results[data] = results[data] + 1 if data in results else 1
print 'running %i rounds %s with maxread=%i' % (rounds, app, maxread)
results = sorted(results.items(), key=lambda (k,v): k, reverse=False)
for data, count in results:
print '%03i x %s' % (count, data)
print
print "=> if output is produced slowly this should work as whished"
print " and should return: aaabbbccc"
readfromapp('slow', rounds=100, maxread=1024)
print
print "=> now mostly aaacccbbb is returnd, not as it should be"
readfromapp('fast', rounds=100, maxread=1024)
print
print "=> you could try to read data one by one, and return"
print " e.g. a whole line only when LF is read"
print " (b's should be finished before c's)"
readfromapp('fast', rounds=100, maxread=1)
print
print "=> but even this won't work ..."
readfromapp('fast2', rounds=100, maxread=1)
并输出如下内容:
=> if output is produced slowly this should work as whished
and should return: aaabbbccc
running 100 rounds slow with maxread=1024
100 x aaabbbccc
=> now mostly aaacccbbb is returnd, not as it should be
running 100 rounds fast with maxread=1024
006 x aaabbbccc
094 x aaacccbbb
=> you could try to read data one by one, and return
e.g. a whole line only when LF is read
(b's should be finished before c's)
running 100 rounds fast with maxread=1
003 x aaabbbccc
003 x aababcbcc
094 x abababccc
=> but even this won't work ...
running 100 rounds fast2 with maxread=1
003 x aaabbbbbbbbbbbbbbbccc
001 x aaacbcbcbbbbbbbbbbbbb
008 x aababcbcbcbbbbbbbbbbb
088 x abababcbcbcbbbbbbbbbb
我知道这个问题很老,但这个答案可能会帮助偶然发现此页面的其他人研究类似情况的解决方案,所以我还是要发布它。
我构建了一个简单的 python 片段,可以将任意数量的管道合并为一个。当然,如上所述,不能保证顺序,但这是我认为你能得到的最接近的Python。
它为每个管道生成一个线程,逐行读取它们并将它们放入队列(即 FIFO)中。主线程循环遍历队列,生成每一行。
import threading, queue
def merge_pipes(**named_pipes):
r'''
Merges multiple pipes from subprocess.Popen (maybe other sources as well).
The keyword argument keys will be used in the output to identify the source
of the line.
Example:
p = subprocess.Popen(['some', 'call'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outputs = {'out': log.info, 'err': log.warn}
for name, line in merge_pipes(out=p.stdout, err=p.stderr):
outputs[name](line)
This will output stdout to the info logger, and stderr to the warning logger
'''
# Constants. Could also be placed outside of the method. I just put them here
# so the method is fully self-contained
PIPE_OPENED=1
PIPE_OUTPUT=2
PIPE_CLOSED=3
# Create a queue where the pipes will be read into
output = queue.Queue()
# This method is the run body for the threads that are instatiated below
# This could be easily rewritten to be outside of the merge_pipes method,
# but to make it fully self-contained I put it here
def pipe_reader(name, pipe):
r"""
reads a single pipe into the queue
"""
output.put( ( PIPE_OPENED, name, ) )
try:
for line in iter(pipe.readline,''):
output.put( ( PIPE_OUTPUT, name, line.rstrip(), ) )
finally:
output.put( ( PIPE_CLOSED, name, ) )
# Start a reader for each pipe
for name, pipe in named_pipes.items():
t=threading.Thread(target=pipe_reader, args=(name, pipe, ))
t.daemon = True
t.start()
# Use a counter to determine how many pipes are left open.
# If all are closed, we can return
pipe_count = 0
# Read the queue in order, blocking if there's no data
for data in iter(output.get,''):
code=data[0]
if code == PIPE_OPENED:
pipe_count += 1
elif code == PIPE_CLOSED:
pipe_count -= 1
elif code == PIPE_OUTPUT:
yield data[1:]
if pipe_count == 0:
return
这对我有用(windows): https://github.com/waszil/subpiper
from subpiper import subpiper
def my_stdout_callback(line: str):
print(f'STDOUT: {line}')
def my_stderr_callback(line: str):
print(f'STDERR: {line}')
my_additional_path_list = [r'c:\important_location']
retcode = subpiper(cmd='echo magic',
stdout_callback=my_stdout_callback,
stderr_callback=my_stderr_callback,
add_path_list=my_additional_path_list)
这是一个基于 selectors
的解决方案,但它保留了顺序,并流式传输 variable-length 个字符(甚至是单个字符)。
诀窍是使用 read1()
,而不是 read()
。
import selectors
import subprocess
import sys
p = subprocess.Popen(
["python", "random_out.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)
while True:
for key, _ in sel.select():
data = key.fileobj.read1().decode()
if not data:
exit()
if key.fileobj is p.stdout:
print(data, end="")
else:
print(data, end="", file=sys.stderr)
如果你想要一个测试程序,使用这个。
import sys
from time import sleep
for i in range(10):
print(f" x{i} ", file=sys.stderr, end="")
sleep(0.1)
print(f" y{i} ", end="")
sleep(0.1)
这适用于 Python3 (3.6):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
# Read both stdout and stderr simultaneously
sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)
ok = True
while ok:
for key, val1 in sel.select():
line = key.fileobj.readline()
if not line:
ok = False
break
if key.fileobj is p.stdout:
print(f"STDOUT: {line}", end="")
else:
print(f"STDERR: {line}", end="", file=sys.stderr)
来自 https://docs.python.org/3/library/subprocess.html#using-the-subprocess-module
If you wish to capture and combine both streams into one, use stdout=PIPE and stderr=STDOUT instead of capture_output.
所以最简单的解决方案是:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout_iterator = iter(process.stdout.readline, b"")
for line in stdout_iterator:
# Do stuff with line
print line