Python 中的多重处理,用于图像批流和处理
Multiprocessing in Python for image batch streaming and processing
在 Python 中开发了多处理代码。
Image batch streaming 在一个进程中完成,在另一个进程中进行批处理。
一旦批处理流具有预定义数量的图像,使用 MultiProcess.Event() 向批处理循环发送信号。所以这两个过程需要在正确的时间。
批流时间比批处理时间长,所以处理部分没有图像丢失。
大部分时间批处理时间比批处理时间短。所以我的批处理方面看起来工作正常。但是有时候,发现batch streaming重复了两次,然后进行batch processing 例如,
batch streaming 2.35
batch processing 2.05
batch streaming 2.25
batch processing 2.05
batch streaming 2.32 repeated
batch streaming 2.36
batch processing 3.25
batch streaming 2.35
batch processing 2.15
batch streaming 2.35
batch processing 2.25
这意味着我在处理部分有图像丢失。
我该如何解决这个问题?
我无法 post 整个代码。
所以两个过程如下。
批处理流循环
while (not stopbit.is_set()):
if not cam_queue.empty():
#print('Got frame')
cmd, val = cam_queue.get()
# calculate FPS
'''diffTime = time.time() - lastFTime
fps = 1 / diffTime
print(fps)
lastFTime = time.time()'''
# if cmd == vs.StreamCommands.RESOLUTION:
# pass #print(val)
if cmd == vs.StreamCommands.FRAME:
if val is not None:
missCount=0
image = np.array(val, dtype=np.float32, order='C')
image=image.transpose([2, 0, 1])
imgrshp=image.reshape(921600)
#print(str(val.shape))
if (batch1_is_processed == False):
batch1_[count] = imgrshp#it is 921,600 flat array
batch3_[count] = val
else:
batch2_[count] = imgrshp
batch4_[count] = val
count = count + 1
if (count >= BATCHSIZE): # to start process for inference and post processing
diffTime = time.time() - lastFTime
print("batching time " + str(diffTime))
if (batch1_is_processed == False): # process batch1
q.put('batch1')
batch1_is_processed = True
#print('batch1 is set')
else: # process batch2
q.put('batch2')
batch1_is_processed = False
#print('batch2 is set')
e.set()#to signal the buffer is full
count = 0
lastFTime = time.time()
else:
missCount = missCount + 1
print("miss frame after " + str(time.time() - startTime))
if(missCount >= 10):
q.put('lostframes')
e.set()#so that immediately will go back to caller to stop with lostframes option
批处理
while(self.stopbit is not None):
self.e.wait()
batch = self.queue.get()
lastFTime = time.time()
if(batch == 'batch1'):#process batch1
#print('batch1 is processed')
for idx in range(BATCHSIZE):
images[idx] = np.frombuffer(self.sharedbatch1[idx], dtype=np.float32)
uimg = np.frombuffer(self.sharedbatch3[idx], dtype=np.uint8)
uimgs[idx] = uimg.reshape(HEIGHT,WIDTH,CHANNEL)
elif(batch == 'batch2'):#process batch1
#print('batch2 is processed')
for idx in range(BATCHSIZE):
images[idx]=np.frombuffer(self.sharedbatch2[idx], dtype=np.float32)
uimg = np.frombuffer(self.sharedbatch4[idx], dtype=np.uint8)
uimgs[idx] = uimg.reshape(HEIGHT,WIDTH,CHANNEL)
elif(batch == 'lostframes'):
self.e.clear()
self.stopbit.set()#to stop streaming
break
#do batch processing in Nvidia's TensorRT
with engine.create_execution_context() as context:
inputs, outputs, bindings, stream = common.allocate_buffers(engine)
inputs[0].host = np.ascontiguousarray(images, dtype=np.float32)
[outputs] = common.do_inference(context, bindings, inputs, outputs, stream, BATCHSIZE)
outputs=outputs.reshape((BATCHSIZE, 60, 80, 57))
humans=[]
for i in range(BATCHSIZE):
heat_map=outputs[i, :, :, :19]
puf_map=outputs[i, :, :, 19:]
humans.append(self.est.inference(heat_map, puf_map, 4.0))
#uimgs[i]=TfPoseEstimatorTRT.draw_humans(uimgs[i], humans[i], imgcopy=False)
#cv2.imwrite("images/image_"+str(cnt)+".jpeg", uimgs[i])
#cnt=cnt+1
hdp.ProcessHumanData(humans, uimgs)
#for i in range(BATCHSIZE):
# cv2.imwrite("images/image_"+str(cnt)+".jpeg", uimgs[i])
# cnt=cnt+1
#cv2.imshow('display',uimgs[i])
#cv2.waitKey(1)
humans.clear()
diffTime = time.time() - lastFTime
print("batch processing time "+str(diffTime))
self.e.clear()
问题已使用 Multiprocessing 中的 Lock 解决。有时,使用 print 进行调试会产生误导。打印本身需要一些毫秒。在调试并行处理代码时需要注意这一点。
在 Python 中开发了多处理代码。 Image batch streaming 在一个进程中完成,在另一个进程中进行批处理。
一旦批处理流具有预定义数量的图像,使用 MultiProcess.Event() 向批处理循环发送信号。所以这两个过程需要在正确的时间。
批流时间比批处理时间长,所以处理部分没有图像丢失。
大部分时间批处理时间比批处理时间短。所以我的批处理方面看起来工作正常。但是有时候,发现batch streaming重复了两次,然后进行batch processing 例如,
batch streaming 2.35
batch processing 2.05
batch streaming 2.25
batch processing 2.05
batch streaming 2.32 repeated
batch streaming 2.36
batch processing 3.25
batch streaming 2.35
batch processing 2.15
batch streaming 2.35
batch processing 2.25
这意味着我在处理部分有图像丢失。 我该如何解决这个问题?
我无法 post 整个代码。 所以两个过程如下。
批处理流循环
while (not stopbit.is_set()):
if not cam_queue.empty():
#print('Got frame')
cmd, val = cam_queue.get()
# calculate FPS
'''diffTime = time.time() - lastFTime
fps = 1 / diffTime
print(fps)
lastFTime = time.time()'''
# if cmd == vs.StreamCommands.RESOLUTION:
# pass #print(val)
if cmd == vs.StreamCommands.FRAME:
if val is not None:
missCount=0
image = np.array(val, dtype=np.float32, order='C')
image=image.transpose([2, 0, 1])
imgrshp=image.reshape(921600)
#print(str(val.shape))
if (batch1_is_processed == False):
batch1_[count] = imgrshp#it is 921,600 flat array
batch3_[count] = val
else:
batch2_[count] = imgrshp
batch4_[count] = val
count = count + 1
if (count >= BATCHSIZE): # to start process for inference and post processing
diffTime = time.time() - lastFTime
print("batching time " + str(diffTime))
if (batch1_is_processed == False): # process batch1
q.put('batch1')
batch1_is_processed = True
#print('batch1 is set')
else: # process batch2
q.put('batch2')
batch1_is_processed = False
#print('batch2 is set')
e.set()#to signal the buffer is full
count = 0
lastFTime = time.time()
else:
missCount = missCount + 1
print("miss frame after " + str(time.time() - startTime))
if(missCount >= 10):
q.put('lostframes')
e.set()#so that immediately will go back to caller to stop with lostframes option
批处理
while(self.stopbit is not None):
self.e.wait()
batch = self.queue.get()
lastFTime = time.time()
if(batch == 'batch1'):#process batch1
#print('batch1 is processed')
for idx in range(BATCHSIZE):
images[idx] = np.frombuffer(self.sharedbatch1[idx], dtype=np.float32)
uimg = np.frombuffer(self.sharedbatch3[idx], dtype=np.uint8)
uimgs[idx] = uimg.reshape(HEIGHT,WIDTH,CHANNEL)
elif(batch == 'batch2'):#process batch1
#print('batch2 is processed')
for idx in range(BATCHSIZE):
images[idx]=np.frombuffer(self.sharedbatch2[idx], dtype=np.float32)
uimg = np.frombuffer(self.sharedbatch4[idx], dtype=np.uint8)
uimgs[idx] = uimg.reshape(HEIGHT,WIDTH,CHANNEL)
elif(batch == 'lostframes'):
self.e.clear()
self.stopbit.set()#to stop streaming
break
#do batch processing in Nvidia's TensorRT
with engine.create_execution_context() as context:
inputs, outputs, bindings, stream = common.allocate_buffers(engine)
inputs[0].host = np.ascontiguousarray(images, dtype=np.float32)
[outputs] = common.do_inference(context, bindings, inputs, outputs, stream, BATCHSIZE)
outputs=outputs.reshape((BATCHSIZE, 60, 80, 57))
humans=[]
for i in range(BATCHSIZE):
heat_map=outputs[i, :, :, :19]
puf_map=outputs[i, :, :, 19:]
humans.append(self.est.inference(heat_map, puf_map, 4.0))
#uimgs[i]=TfPoseEstimatorTRT.draw_humans(uimgs[i], humans[i], imgcopy=False)
#cv2.imwrite("images/image_"+str(cnt)+".jpeg", uimgs[i])
#cnt=cnt+1
hdp.ProcessHumanData(humans, uimgs)
#for i in range(BATCHSIZE):
# cv2.imwrite("images/image_"+str(cnt)+".jpeg", uimgs[i])
# cnt=cnt+1
#cv2.imshow('display',uimgs[i])
#cv2.waitKey(1)
humans.clear()
diffTime = time.time() - lastFTime
print("batch processing time "+str(diffTime))
self.e.clear()
问题已使用 Multiprocessing 中的 Lock 解决。有时,使用 print 进行调试会产生误导。打印本身需要一些毫秒。在调试并行处理代码时需要注意这一点。