为什么 `popen.stdout.readline` 会死锁以及如何处理它?
Why can `popen.stdout.readline` deadlock and what to do about it?
Warning Use communicate()
rather than .stdin.write
, .stdout.read
or .stderr.read
to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.
我试图理解为什么这会陷入僵局。对于某些背景,我并行生成 N 个进程:
for c in commands:
h = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
handles.append(h)
然后逐一打印每个进程的输出:
for handle in handles:
while handle.poll() is None:
try:
line = handle.stdout.readline()
except UnicodeDecodeError:
line = "((INVALID UNICODE))\n"
sys.stdout.write(line)
if handle.returncode != 0:
print(handle.stdout.read(), file=sys.stdout)
if handle.returncode != 0:
print(handle.stderr.read(), file=sys.stderr)
有时这确实会造成死锁。不幸的是,文档中使用 communicate()
的建议对我不起作用,因为这个过程可能需要几分钟才能达到 运行,我不希望它在这段时间内出现死机。它应该实时打印输出。
我有几种选择,例如更改 bufsize
参数、在不同线程中为每个句柄轮询等。但是为了确定解决此问题的最佳方法,我认为我需要首先了解僵局的根本原因是什么。显然与缓冲区大小有关,但是什么?我可以假设,也许所有这些进程都共享一个 OS 内核对象,并且因为我只耗尽其中一个进程的缓冲区,其他进程会填满它,在这种情况下,上面的选项 2 会可能会修复它。但也许这甚至不是真正的问题。
任何人都可以阐明这一点吗?
父子进程之间的双向通信使用两个单向管道。每个方向一个。 OK,stderr是第三个,但是思路是一样的
一根管子有两端,一端写,一端读。管道的容量是 4K,现在在现代 Linux 上是 64K。人们可以期望在其他系统上有类似的价值。这意味着,写入器可以毫无问题地写入管道,直到达到其限制,但是随后管道变满并且写入阻塞,直到 reader 从另一端读取一些数据。
从reader的角度来看,情况是显而易见的。常规读取块,直到数据可用。
总结一下:当一个进程试图从一个没有人正在写入的管道中读取数据,或者当它向一个没有人正在读取的管道写入大于管道容量的数据时,就会发生死锁。
通常这两个进程充当客户端和服务器,并利用某种 request/response 风格的通信。类似半双工的东西。一方面是写作,另一方面是阅读。然后他们交换角色。这实际上是我们可以使用标准同步编程处理的最复杂的设置。当客户端和服务器以某种方式不同步时,仍然会发生死锁。这可能是由空响应、意外错误消息等引起的。
如果有多个子进程或者当通信协议不是那么简单或者我们只是想要一个健壮的解决方案时,我们需要父进程对所有管道进行操作。 communicate()
为此使用线程。另一种方法是异步的 I/O:首先检查哪个准备好执行 I/O 然后才从该管道(或套接字)读取或写入。旧的和弃用的 asyncore 库实现了这一点。
在底层,select(或类似的)系统调用检查给定集合中的哪些文件句柄已准备好用于 I/O。但是在那个低级别,我们只能在重新检查之前进行一次读取或写入。这就是这个片段的问题:
while handle.poll() is None:
try:
line = handle.stdout.readline()
except UnicodeDecodeError:
line = "((INVALID UNICODE))\n"
poll
检查告诉我们有东西要读取,但这并不意味着我们可以重复读取直到换行!我们只能进行一次读取并将数据附加到输入缓冲区。如果有换行符,我们可以提取整行并进行处理。如果没有,我们需要等待下一次成功的投票和阅读。
写入行为类似。我们可以写一次,检查写入的字节数并从输出缓冲区中删除那么多字节。
这意味着需要在此基础上实现行缓冲和所有更高级别的内容。幸运的是,asyncore
的继任者提供了我们需要的东西:asyncio > subprocesses.
我希望我能解释一下僵局。解决方案是可以预期的。如果您需要做几件事,请使用线程或 asyncio
.
更新:
下面是一个简短的 asyncio 测试程序。它从多个子进程读取输入并逐行打印数据。
但首先是一个 cmd.py
助手,它以几个小块打印一行以演示行缓冲。尝试使用例如python3 cmd.py 10
.
import sys
import time
def countdown(n):
print('START', n)
while n >= 0:
print(n, end=' ', flush=True)
time.sleep(0.1)
n -= 1
print('END')
if __name__ == '__main__':
args = sys.argv[1:]
if len(args) != 1:
sys.exit(3)
countdown(int(args[0]))
和主程序:
import asyncio
PROG = 'cmd.py'
NPROC = 12
async def run1(*execv):
"""Run a program, read input lines."""
proc = await asyncio.create_subprocess_exec(
*execv,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL)
# proc.stdout is a StreamReader object
async for line in proc.stdout:
print("Got line:", line.decode().strip())
async def manager(prog, nproc):
"""Spawn 'nproc' copies of python script 'prog'."""
tasks = [asyncio.create_task(run1('python3', prog, str(i))) for i in range(nproc)]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(manager(PROG, NPROC))
async for line ...
是 StreamReader
的一个特征,类似于 for line in file:
成语。可以替换为:
while True:
line = await proc.stdout.readline()
if not line:
break
print("Got line:", line.decode().strip())
Warning Use
communicate()
rather than.stdin.write
,.stdout.read
or.stderr.read
to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.
我试图理解为什么这会陷入僵局。对于某些背景,我并行生成 N 个进程:
for c in commands:
h = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
handles.append(h)
然后逐一打印每个进程的输出:
for handle in handles:
while handle.poll() is None:
try:
line = handle.stdout.readline()
except UnicodeDecodeError:
line = "((INVALID UNICODE))\n"
sys.stdout.write(line)
if handle.returncode != 0:
print(handle.stdout.read(), file=sys.stdout)
if handle.returncode != 0:
print(handle.stderr.read(), file=sys.stderr)
有时这确实会造成死锁。不幸的是,文档中使用 communicate()
的建议对我不起作用,因为这个过程可能需要几分钟才能达到 运行,我不希望它在这段时间内出现死机。它应该实时打印输出。
我有几种选择,例如更改 bufsize
参数、在不同线程中为每个句柄轮询等。但是为了确定解决此问题的最佳方法,我认为我需要首先了解僵局的根本原因是什么。显然与缓冲区大小有关,但是什么?我可以假设,也许所有这些进程都共享一个 OS 内核对象,并且因为我只耗尽其中一个进程的缓冲区,其他进程会填满它,在这种情况下,上面的选项 2 会可能会修复它。但也许这甚至不是真正的问题。
任何人都可以阐明这一点吗?
父子进程之间的双向通信使用两个单向管道。每个方向一个。 OK,stderr是第三个,但是思路是一样的
一根管子有两端,一端写,一端读。管道的容量是 4K,现在在现代 Linux 上是 64K。人们可以期望在其他系统上有类似的价值。这意味着,写入器可以毫无问题地写入管道,直到达到其限制,但是随后管道变满并且写入阻塞,直到 reader 从另一端读取一些数据。
从reader的角度来看,情况是显而易见的。常规读取块,直到数据可用。
总结一下:当一个进程试图从一个没有人正在写入的管道中读取数据,或者当它向一个没有人正在读取的管道写入大于管道容量的数据时,就会发生死锁。
通常这两个进程充当客户端和服务器,并利用某种 request/response 风格的通信。类似半双工的东西。一方面是写作,另一方面是阅读。然后他们交换角色。这实际上是我们可以使用标准同步编程处理的最复杂的设置。当客户端和服务器以某种方式不同步时,仍然会发生死锁。这可能是由空响应、意外错误消息等引起的。
如果有多个子进程或者当通信协议不是那么简单或者我们只是想要一个健壮的解决方案时,我们需要父进程对所有管道进行操作。 communicate()
为此使用线程。另一种方法是异步的 I/O:首先检查哪个准备好执行 I/O 然后才从该管道(或套接字)读取或写入。旧的和弃用的 asyncore 库实现了这一点。
在底层,select(或类似的)系统调用检查给定集合中的哪些文件句柄已准备好用于 I/O。但是在那个低级别,我们只能在重新检查之前进行一次读取或写入。这就是这个片段的问题:
while handle.poll() is None:
try:
line = handle.stdout.readline()
except UnicodeDecodeError:
line = "((INVALID UNICODE))\n"
poll
检查告诉我们有东西要读取,但这并不意味着我们可以重复读取直到换行!我们只能进行一次读取并将数据附加到输入缓冲区。如果有换行符,我们可以提取整行并进行处理。如果没有,我们需要等待下一次成功的投票和阅读。
写入行为类似。我们可以写一次,检查写入的字节数并从输出缓冲区中删除那么多字节。
这意味着需要在此基础上实现行缓冲和所有更高级别的内容。幸运的是,asyncore
的继任者提供了我们需要的东西:asyncio > subprocesses.
我希望我能解释一下僵局。解决方案是可以预期的。如果您需要做几件事,请使用线程或 asyncio
.
更新:
下面是一个简短的 asyncio 测试程序。它从多个子进程读取输入并逐行打印数据。
但首先是一个 cmd.py
助手,它以几个小块打印一行以演示行缓冲。尝试使用例如python3 cmd.py 10
.
import sys
import time
def countdown(n):
print('START', n)
while n >= 0:
print(n, end=' ', flush=True)
time.sleep(0.1)
n -= 1
print('END')
if __name__ == '__main__':
args = sys.argv[1:]
if len(args) != 1:
sys.exit(3)
countdown(int(args[0]))
和主程序:
import asyncio
PROG = 'cmd.py'
NPROC = 12
async def run1(*execv):
"""Run a program, read input lines."""
proc = await asyncio.create_subprocess_exec(
*execv,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL)
# proc.stdout is a StreamReader object
async for line in proc.stdout:
print("Got line:", line.decode().strip())
async def manager(prog, nproc):
"""Spawn 'nproc' copies of python script 'prog'."""
tasks = [asyncio.create_task(run1('python3', prog, str(i))) for i in range(nproc)]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(manager(PROG, NPROC))
async for line ...
是 StreamReader
的一个特征,类似于 for line in file:
成语。可以替换为:
while True:
line = await proc.stdout.readline()
if not line:
break
print("Got line:", line.decode().strip())