Python 并行处理图像批处理
Parallel processing in Python for image batching
我喜欢并行使用两个函数,一个用于图像批处理(流式传输所有 25 张图像以进行处理),另一个用于处理批处理图像。它们需要并行。
所以我有批处理图像 BatchStreaming(self)
和处理 BatchProcessing(self, b_num)
的主要功能。现在 BatchStreaming 运行良好。流完25张图片后,需要进行批处理。我有两个并行进程。他们是
(1)While loop in BatchStreaming
需要继续处理另一批图像。
(2)同时需要处理当前批处理的图片
我很困惑是应该使用进程还是线程。我更喜欢进程,因为我喜欢利用 CPU 中的所有内核。 (Python 的线程 运行 仅在一个 CPU 核心上)
那我有两个问题
(1)进程必须加入回主程序才能继续。但我需要继续下一批图像。
(2)在下面的程序中,当BatchProcessing(self, b_num)
被调用时出现异常
Caught Main Exception
(<class 'TypeError'>, TypeError("'module' object is not callable",), <traceback object at 0x7f98635dcfc8>)
可能是什么问题?
代码如下
import multiprocessing as MultiProcess
import time
import vid_streamv3 as vs
import cv2
import sys
import numpy as np
import os
BATCHSIZE=25
CHANNEL=3
HEIGHT=480
WIDTH=640
ORGHEIGHT=1080
ORGWIDTH=1920
class ProcessPipeline:
def __init__(self):
#Current Cam
self.camProcess = None
self.cam_queue = MultiProcess.Queue(maxsize=100)
self.stopbit = None
self.camlink = 'rtsp://root:pass@192.168.0.90/axis-media/media.amp?camera=1' #Add your RTSP cam link
self.framerate = 25
self.fullsize_batch1=np.zeros((BATCHSIZE, ORGHEIGHT, ORGWIDTH, CHANNEL), dtype=np.uint8)
self.fullsize_batch2=np.zeros((BATCHSIZE, ORGHEIGHT, ORGWIDTH, CHANNEL), dtype=np.uint8)
self.batch1_is_processed=False
def BatchStreaming(self):
#get all cams
time.sleep(3)
self.stopbit = MultiProcess.Event()
self.camProcess = vs.StreamCapture(self.camlink,
self.stopbit,
self.cam_queue,
self.framerate)
self.camProcess.start()
count=0
try:
while True:
if not self.cam_queue.empty():
cmd, val = self.cam_queue.get()
if cmd == vs.StreamCommands.FRAME:
if val is not None:
print('streaming starts ')
if(self.batch1_is_processed == False):
self.fullsize_batch1[count]=val
else:
self.fullsize_batch2[count]=val
count=count+1
if(count>=25):
if(self.batch1_is_processed == False):#to start process for inference and post processing for batch 1
self.batch1_is_processed = True
print('batch 1 process')
p = MultiProcess(target=self.BatchProcessing, args=(1,))
else:#to start process for inference and post processing for batch 2
self.batch1_is_processed = False
print('batch 2 process')
p = MultiProcess(target=self.BatchProcessing, args=(2,))
p.start()
print('BatchProcessing start')
p.join()
print('BatchProcessing join')
count=0
cv2.imshow('Cam: ' + self.camlink, val)
cv2.waitKey(1)
except KeyboardInterrupt:
print('Caught Keyboard interrupt')
except:
e = sys.exc_info()
print('Caught Main Exception')
print(e)
self.StopStreaming()
cv2.destroyAllWindows()
def StopStreaming(self):
print('in stopCamStream')
if self.stopbit is not None:
self.stopbit.set()
while not self.cam_queue.empty():
try:
_ = self.cam_queue.get()
except:
break
self.cam_queue.close()
print("before camProcess.join()")
self.camProcess.join()
print("after camProcess.join()")
def BatchProcessing(self, b_num):
print('module name:', __name__)
if hasattr(os, 'getppid'): # only available on Unix
print('parent process:', os.getppid())
print('process id:', os.getpid())
if __name__ == "__main__":
mc = ProcessPipeline()
mc.BatchStreaming()
我使用了事件信号,如下所示。
这对我的申请来说更直接。
当批处理循环有足够的图像时,发出批处理信号。
#event_tut.py
import random, time
from threading import Event, Thread
event = Event()
def waiter(event, nloops):
count=0
while(count<10):
print("%s. Waiting for the flag to be set." % (i+1))
event.wait() # Blocks until the flag becomes true.
print("Wait complete at:", time.ctime())
event.clear() # Resets the flag.
print('wait exit')
count=count+1
def setter(event, nloops):
for i in range(nloops):
time.sleep(random.randrange(2, 5)) # Sleeps for some time.
event.set()
threads = []
nloops = 10
threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()
threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()
for thread in threads:
thread.join()
print("All done.")
我喜欢并行使用两个函数,一个用于图像批处理(流式传输所有 25 张图像以进行处理),另一个用于处理批处理图像。它们需要并行。
所以我有批处理图像 BatchStreaming(self)
和处理 BatchProcessing(self, b_num)
的主要功能。现在 BatchStreaming 运行良好。流完25张图片后,需要进行批处理。我有两个并行进程。他们是
(1)While loop in BatchStreaming
需要继续处理另一批图像。
(2)同时需要处理当前批处理的图片
我很困惑是应该使用进程还是线程。我更喜欢进程,因为我喜欢利用 CPU 中的所有内核。 (Python 的线程 运行 仅在一个 CPU 核心上)
那我有两个问题 (1)进程必须加入回主程序才能继续。但我需要继续下一批图像。
(2)在下面的程序中,当BatchProcessing(self, b_num)
被调用时出现异常
Caught Main Exception
(<class 'TypeError'>, TypeError("'module' object is not callable",), <traceback object at 0x7f98635dcfc8>)
可能是什么问题?
代码如下
import multiprocessing as MultiProcess
import time
import vid_streamv3 as vs
import cv2
import sys
import numpy as np
import os
BATCHSIZE=25
CHANNEL=3
HEIGHT=480
WIDTH=640
ORGHEIGHT=1080
ORGWIDTH=1920
class ProcessPipeline:
def __init__(self):
#Current Cam
self.camProcess = None
self.cam_queue = MultiProcess.Queue(maxsize=100)
self.stopbit = None
self.camlink = 'rtsp://root:pass@192.168.0.90/axis-media/media.amp?camera=1' #Add your RTSP cam link
self.framerate = 25
self.fullsize_batch1=np.zeros((BATCHSIZE, ORGHEIGHT, ORGWIDTH, CHANNEL), dtype=np.uint8)
self.fullsize_batch2=np.zeros((BATCHSIZE, ORGHEIGHT, ORGWIDTH, CHANNEL), dtype=np.uint8)
self.batch1_is_processed=False
def BatchStreaming(self):
#get all cams
time.sleep(3)
self.stopbit = MultiProcess.Event()
self.camProcess = vs.StreamCapture(self.camlink,
self.stopbit,
self.cam_queue,
self.framerate)
self.camProcess.start()
count=0
try:
while True:
if not self.cam_queue.empty():
cmd, val = self.cam_queue.get()
if cmd == vs.StreamCommands.FRAME:
if val is not None:
print('streaming starts ')
if(self.batch1_is_processed == False):
self.fullsize_batch1[count]=val
else:
self.fullsize_batch2[count]=val
count=count+1
if(count>=25):
if(self.batch1_is_processed == False):#to start process for inference and post processing for batch 1
self.batch1_is_processed = True
print('batch 1 process')
p = MultiProcess(target=self.BatchProcessing, args=(1,))
else:#to start process for inference and post processing for batch 2
self.batch1_is_processed = False
print('batch 2 process')
p = MultiProcess(target=self.BatchProcessing, args=(2,))
p.start()
print('BatchProcessing start')
p.join()
print('BatchProcessing join')
count=0
cv2.imshow('Cam: ' + self.camlink, val)
cv2.waitKey(1)
except KeyboardInterrupt:
print('Caught Keyboard interrupt')
except:
e = sys.exc_info()
print('Caught Main Exception')
print(e)
self.StopStreaming()
cv2.destroyAllWindows()
def StopStreaming(self):
print('in stopCamStream')
if self.stopbit is not None:
self.stopbit.set()
while not self.cam_queue.empty():
try:
_ = self.cam_queue.get()
except:
break
self.cam_queue.close()
print("before camProcess.join()")
self.camProcess.join()
print("after camProcess.join()")
def BatchProcessing(self, b_num):
print('module name:', __name__)
if hasattr(os, 'getppid'): # only available on Unix
print('parent process:', os.getppid())
print('process id:', os.getpid())
if __name__ == "__main__":
mc = ProcessPipeline()
mc.BatchStreaming()
我使用了事件信号,如下所示。 这对我的申请来说更直接。
当批处理循环有足够的图像时,发出批处理信号。
#event_tut.py
import random, time
from threading import Event, Thread
event = Event()
def waiter(event, nloops):
count=0
while(count<10):
print("%s. Waiting for the flag to be set." % (i+1))
event.wait() # Blocks until the flag becomes true.
print("Wait complete at:", time.ctime())
event.clear() # Resets the flag.
print('wait exit')
count=count+1
def setter(event, nloops):
for i in range(nloops):
time.sleep(random.randrange(2, 5)) # Sleeps for some time.
event.set()
threads = []
nloops = 10
threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()
threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()
for thread in threads:
thread.join()
print("All done.")