在 python 中将 Pipe 与多处理一起使用时出现 EOFError
EOFError while using Pipe with multiprocessing in python
我正在尝试使用多处理模型来加快模型的检测速度。我想使用四个内核并以并行方式处理图像,同时保持它们的顺序。当一个进程被派生时,一个管道也被实例化,子部分被传递给进程,而父部分被保存在一个单独的队列中。即使在轮询管道后,脚本也会在我调用父管道上的 recv 方法时抛出 EOFError。
在这个程序的一个更简单的版本中,我只是发送了“hello”而不是图像,但仍然抛出了 EOFError。另外,我尝试了关闭和不关闭管道子端的脚本,但仍然抛出错误
import io
import time
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
from absl import app, flags, logging
import core.utils as utils
from core.yolov4 import filter_boxes
from tensorflow.python.saved_model import tag_constants
from PIL import Image
import cv2
import numpy as np
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from queue import Queue
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool, Process, Pipe
flag_rep = {
"framework" : 'tflite',
"weights" : './checkpoints/yolov4-tiny-416.tflite',
"size" : 416,
"tiny" : False,
"model" : 'yolov4',
"iou" : .45,
"score" : .25,
}
vid = cv2.VideoCapture(0)
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
STRIDES, ANCHORS, NUM_CLASS, XYSCALE = utils.load_config_from_dict(flag_rep)
input_size = flag_rep['size']
interpreter = tf.lite.Interpreter(model_path=flag_rep['weights'])
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
i=0
def eval_im(frame, child):
original_image = frame
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
image_data = cv2.resize(original_image, (input_size, input_size))
image_data = image_data / 255.
images_data = []
for i in range(1):
images_data.append(image_data)
images_data = np.asarray(images_data).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], images_data)
interpreter.invoke()
pred = [interpreter.get_tensor(output_details[i]['index']) for i in range(len(output_details))]
boxes, pred_conf = filter_boxes(pred[0], pred[1], score_threshold=0.25, input_shape=tf.constant([input_size, input_size]))
boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression(
boxes=tf.reshape(boxes, (tf.shape(boxes)[0], -1, 1, 4)),
scores=tf.reshape(
pred_conf, (tf.shape(pred_conf)[0], -1, tf.shape(pred_conf)[-1])),
max_output_size_per_class=50,
max_total_size=50,
iou_threshold=flag_rep['iou'],
score_threshold=flag_rep['score']
)
pred_bbox = [boxes.numpy(), scores.numpy(), classes.numpy(), valid_detections.numpy()]
image = utils.draw_bbox(original_image, pred_bbox)
child.send([img])
child.close()
if __name__ == "__main__":
processes = Queue()
parent_pipes = Queue()
overflow = Queue()
while True:
ret, pic = vid.read()
overflow.put(pic)
if processes.qsize() < 4:
parent, child = Pipe()
parent_pipes.put(parent)
process = Process(target=eval_im, args=(overflow.get(), child))
processes.put(process)
process.start()
print(parent_pipes.qsize())
if parent_pipes.queue[0].poll():
process = processes.get()
process.join()
parent_pipe = parent_pipes.get()
img = parent_pipe.recv()
parent_pipe.close()
# cv2.imshow("frame", img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
vid.release()
cv2.destroyAllWindows()
这是输出:
Traceback (most recent call last):
File "testing_multi.py", line 89, in <module>
img = parent_pipe.recv()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
致现在遇到此问题的任何人。我发现这个错误的发生是因为 python 中的全局解释器锁。我不是那个 well-read 就可以了。但要点是一次只有一个线程可以使用 python 解释器。并发和 concurrent.futures 等模块可以通过产生多个解释器来解决这个问题。但是,这两个实例之间很难共享数据。因此,在其中一些模块中存在管道和队列等工具,允许这些实例进行通信。但是,这些工具要求对象是“可腌制的”。在这种情况下,我的对象是不可腌制的。因此我得到了一个错误。
我正在尝试使用多处理模型来加快模型的检测速度。我想使用四个内核并以并行方式处理图像,同时保持它们的顺序。当一个进程被派生时,一个管道也被实例化,子部分被传递给进程,而父部分被保存在一个单独的队列中。即使在轮询管道后,脚本也会在我调用父管道上的 recv 方法时抛出 EOFError。
在这个程序的一个更简单的版本中,我只是发送了“hello”而不是图像,但仍然抛出了 EOFError。另外,我尝试了关闭和不关闭管道子端的脚本,但仍然抛出错误
import io
import time
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
from absl import app, flags, logging
import core.utils as utils
from core.yolov4 import filter_boxes
from tensorflow.python.saved_model import tag_constants
from PIL import Image
import cv2
import numpy as np
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from queue import Queue
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool, Process, Pipe
flag_rep = {
"framework" : 'tflite',
"weights" : './checkpoints/yolov4-tiny-416.tflite',
"size" : 416,
"tiny" : False,
"model" : 'yolov4',
"iou" : .45,
"score" : .25,
}
vid = cv2.VideoCapture(0)
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
STRIDES, ANCHORS, NUM_CLASS, XYSCALE = utils.load_config_from_dict(flag_rep)
input_size = flag_rep['size']
interpreter = tf.lite.Interpreter(model_path=flag_rep['weights'])
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
i=0
def eval_im(frame, child):
original_image = frame
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
image_data = cv2.resize(original_image, (input_size, input_size))
image_data = image_data / 255.
images_data = []
for i in range(1):
images_data.append(image_data)
images_data = np.asarray(images_data).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], images_data)
interpreter.invoke()
pred = [interpreter.get_tensor(output_details[i]['index']) for i in range(len(output_details))]
boxes, pred_conf = filter_boxes(pred[0], pred[1], score_threshold=0.25, input_shape=tf.constant([input_size, input_size]))
boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression(
boxes=tf.reshape(boxes, (tf.shape(boxes)[0], -1, 1, 4)),
scores=tf.reshape(
pred_conf, (tf.shape(pred_conf)[0], -1, tf.shape(pred_conf)[-1])),
max_output_size_per_class=50,
max_total_size=50,
iou_threshold=flag_rep['iou'],
score_threshold=flag_rep['score']
)
pred_bbox = [boxes.numpy(), scores.numpy(), classes.numpy(), valid_detections.numpy()]
image = utils.draw_bbox(original_image, pred_bbox)
child.send([img])
child.close()
if __name__ == "__main__":
processes = Queue()
parent_pipes = Queue()
overflow = Queue()
while True:
ret, pic = vid.read()
overflow.put(pic)
if processes.qsize() < 4:
parent, child = Pipe()
parent_pipes.put(parent)
process = Process(target=eval_im, args=(overflow.get(), child))
processes.put(process)
process.start()
print(parent_pipes.qsize())
if parent_pipes.queue[0].poll():
process = processes.get()
process.join()
parent_pipe = parent_pipes.get()
img = parent_pipe.recv()
parent_pipe.close()
# cv2.imshow("frame", img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
vid.release()
cv2.destroyAllWindows()
这是输出:
Traceback (most recent call last):
File "testing_multi.py", line 89, in <module>
img = parent_pipe.recv()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
致现在遇到此问题的任何人。我发现这个错误的发生是因为 python 中的全局解释器锁。我不是那个 well-read 就可以了。但要点是一次只有一个线程可以使用 python 解释器。并发和 concurrent.futures 等模块可以通过产生多个解释器来解决这个问题。但是,这两个实例之间很难共享数据。因此,在其中一些模块中存在管道和队列等工具,允许这些实例进行通信。但是,这些工具要求对象是“可腌制的”。在这种情况下,我的对象是不可腌制的。因此我得到了一个错误。