'Broken Pipe' 在循环中重复使用同一管道时出错
'Broken Pipe' error when reusing the same pipe inside a loop
我是进程间通信的新手,我想了解 os.pipe
和 os.fork
在 Python 中的相互用法。
在下面的代码中,如果我取消对 "Broken Pipe" 行的注释,则会出现错误,否则它工作正常。
想法是在子进程退出时有一个 SIGCHLD 处理程序,并在仅子函数 (run_child) 和仅父函数 (sigchld_handler) 执行时递增相应的计数器。由于分叉进程将有自己的内存版本并且更改不会反映在父进程中,因此尝试让子进程通过管道向父进程发送消息并让父进程更新计数器。
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
self.rd , self.wr = os.pipe()
print self.rd , self.wr
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
#rf = os.fdopen(self.rd, 'r')
#self.child = int(rf.read())
#rf.close()
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
wr = os.fdopen(self.wr,'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
有趣的是,在前几次迭代后出现错误。有人可以解释为什么会出现错误以及我应该怎么做才能解决这个问题。
编辑 1:
有几个类似的例子: ex1 , ex2 , ex3 。实际上我只是用它们来学习,但在我的例子中,我将示例扩展到循环中的 运行 以更像一个 producer/consumer 队列。我知道这可能不是一个好方法,因为 multiprocess/Queue 模块在 Python 中可用,但我想了解我在这里犯的错误。
编辑 2(解决方案):
基于,修改代码为每次通信创建一个新管道。这是修改后的代码。
import os
import pdb
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
os.close(self.wr)
print "Main run count : (parent) ", self.parent
rd = os.fdopen(self.rd, 'r')
self.child = int(rd.read())
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self.rd , self.wr = os.pipe()
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
有了这个,输出应该是这样的。
Main run count (child) : 1
Running in child : 15752
C==> 1
Main run count : (parent) 1
Main run count (child) : 2
Running in child : 15753
C==> 2
Main run count : (parent) 2
Main run count (child) : 3
Running in child : 15754
C==> 3
Main run count : (parent) 3
Main run count (child) : 4
Running in child : 15755
C==> 4
Main run count : (parent) 4
Main run count (child) : 5
Running in child : 15756
C==> 5
Main run count : (parent) 5
Main run count (child) : 6
Running in child : 15757
C==> 6
Main run count : (parent) 6
您的代码存在的问题是您试图多次重复使用一个管道,而这通常不是管道的有效情况。
你得到的例外只是说你:"Hey, you have closed this pipe on the previous run. Once a pipe is closed, it's closed.".
因此您可以更改代码为每个 child 创建一个管道,将一端(读取)存储在 "parent" 中并将另一端提供给 child。然后它应该工作。
编辑 1. 我已经用关于 "one pipe for every child" 的东西更新了你的代码,这不是好的代码应该是怎样的,但在教育意义上希望它会有所帮助。
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
os.close(self.wr)
rf = os.fdopen(self.rd, 'r')
message = rf.read()
rf.close()
print "Code from child [", self._child_pid, "]: ", message
self.rd = None
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "Hello from %s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
rd, wr = os.pipe()
self.rd = rd
self.wr = wr
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
我是进程间通信的新手,我想了解 os.pipe
和 os.fork
在 Python 中的相互用法。
在下面的代码中,如果我取消对 "Broken Pipe" 行的注释,则会出现错误,否则它工作正常。
想法是在子进程退出时有一个 SIGCHLD 处理程序,并在仅子函数 (run_child) 和仅父函数 (sigchld_handler) 执行时递增相应的计数器。由于分叉进程将有自己的内存版本并且更改不会反映在父进程中,因此尝试让子进程通过管道向父进程发送消息并让父进程更新计数器。
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
self.rd , self.wr = os.pipe()
print self.rd , self.wr
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
#rf = os.fdopen(self.rd, 'r')
#self.child = int(rf.read())
#rf.close()
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
wr = os.fdopen(self.wr,'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
有趣的是,在前几次迭代后出现错误。有人可以解释为什么会出现错误以及我应该怎么做才能解决这个问题。
编辑 1: 有几个类似的例子: ex1 , ex2 , ex3 。实际上我只是用它们来学习,但在我的例子中,我将示例扩展到循环中的 运行 以更像一个 producer/consumer 队列。我知道这可能不是一个好方法,因为 multiprocess/Queue 模块在 Python 中可用,但我想了解我在这里犯的错误。
编辑 2(解决方案):
基于
import os
import pdb
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
os.close(self.wr)
print "Main run count : (parent) ", self.parent
rd = os.fdopen(self.rd, 'r')
self.child = int(rd.read())
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self.rd , self.wr = os.pipe()
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
有了这个,输出应该是这样的。
Main run count (child) : 1
Running in child : 15752
C==> 1
Main run count : (parent) 1
Main run count (child) : 2
Running in child : 15753
C==> 2
Main run count : (parent) 2
Main run count (child) : 3
Running in child : 15754
C==> 3
Main run count : (parent) 3
Main run count (child) : 4
Running in child : 15755
C==> 4
Main run count : (parent) 4
Main run count (child) : 5
Running in child : 15756
C==> 5
Main run count : (parent) 5
Main run count (child) : 6
Running in child : 15757
C==> 6
Main run count : (parent) 6
您的代码存在的问题是您试图多次重复使用一个管道,而这通常不是管道的有效情况。 你得到的例外只是说你:"Hey, you have closed this pipe on the previous run. Once a pipe is closed, it's closed.".
因此您可以更改代码为每个 child 创建一个管道,将一端(读取)存储在 "parent" 中并将另一端提供给 child。然后它应该工作。
编辑 1. 我已经用关于 "one pipe for every child" 的东西更新了你的代码,这不是好的代码应该是怎样的,但在教育意义上希望它会有所帮助。
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
os.close(self.wr)
rf = os.fdopen(self.rd, 'r')
message = rf.read()
rf.close()
print "Code from child [", self._child_pid, "]: ", message
self.rd = None
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "Hello from %s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
rd, wr = os.pipe()
self.rd = rd
self.wr = wr
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break