Python 并行处理图像批处理

Parallel processing in Python for image batching

我喜欢并行使用两个函数,一个用于图像批处理(流式传输所有 25 张图像以进行处理),另一个用于处理批处理图像。它们需要并行。

所以我有批处理图像 BatchStreaming(self) 和处理 BatchProcessing(self, b_num) 的主要功能。现在 BatchStreaming 运行良好。流完25张图片后,需要进行批处理。我有两个并行进程。他们是

(1)While loop in BatchStreaming 需要继续处理另一批图像。

(2)同时需要处理当前批处理的图片

我很困惑是应该使用进程还是线程。我更喜欢进程,因为我喜欢利用 CPU 中的所有内核。 (Python 的线程 运行 仅在一个 CPU 核心上)

那我有两个问题 (1)进程必须加入回主程序才能继续。但我需要继续下一批图像。

(2)在下面的程序中,当BatchProcessing(self, b_num)被调用时出现异常

Caught Main Exception
(<class 'TypeError'>, TypeError("'module' object is not callable",), <traceback object at 0x7f98635dcfc8>)

可能是什么问题?

代码如下

import multiprocessing as MultiProcess
import time
import vid_streamv3 as vs
import cv2
import sys
import numpy as np
import os
BATCHSIZE=25
CHANNEL=3
HEIGHT=480
WIDTH=640
ORGHEIGHT=1080
ORGWIDTH=1920

class ProcessPipeline:
    def __init__(self):

        #Current Cam
        self.camProcess = None
        self.cam_queue = MultiProcess.Queue(maxsize=100)
        self.stopbit = None
        self.camlink = 'rtsp://root:pass@192.168.0.90/axis-media/media.amp?camera=1' #Add your RTSP cam link
        self.framerate = 25
        self.fullsize_batch1=np.zeros((BATCHSIZE, ORGHEIGHT, ORGWIDTH, CHANNEL), dtype=np.uint8)
        self.fullsize_batch2=np.zeros((BATCHSIZE, ORGHEIGHT, ORGWIDTH, CHANNEL), dtype=np.uint8)      
        self.batch1_is_processed=False

    def BatchStreaming(self):
        #get all cams
        time.sleep(3)
        self.stopbit = MultiProcess.Event()
        self.camProcess = vs.StreamCapture(self.camlink,
                             self.stopbit,
                             self.cam_queue,
                            self.framerate)
        self.camProcess.start()

        count=0
        try:
            while True:

                if not self.cam_queue.empty():
                    cmd, val = self.cam_queue.get()

                    if cmd == vs.StreamCommands.FRAME:
                        if val is not None:
                            print('streaming starts ')
                            if(self.batch1_is_processed == False):
                                self.fullsize_batch1[count]=val
                            else:
                                self.fullsize_batch2[count]=val
                            count=count+1
                            if(count>=25):
                               if(self.batch1_is_processed == False):#to start process for inference and post processing for batch 1
                                  self.batch1_is_processed  = True
                                  print('batch 1 process')
                                  p = MultiProcess(target=self.BatchProcessing, args=(1,))

                               else:#to start process for inference and post processing for batch 2
                                  self.batch1_is_processed  = False
                                  print('batch 2 process')
                                  p = MultiProcess(target=self.BatchProcessing, args=(2,))
                               p.start()
                               print('BatchProcessing start')
                               p.join() 
                               print('BatchProcessing join') 
                               count=0
                            cv2.imshow('Cam: ' + self.camlink, val)
                            cv2.waitKey(1)

        except KeyboardInterrupt:
            print('Caught Keyboard interrupt')

        except:
            e = sys.exc_info()
            print('Caught Main Exception')
            print(e)

        self.StopStreaming()
        cv2.destroyAllWindows()

    def StopStreaming(self):
        print('in stopCamStream')
        if self.stopbit is not None:
            self.stopbit.set()
            while not self.cam_queue.empty():
                try:
                    _ = self.cam_queue.get()
                except:
                    break
                self.cam_queue.close()
            print("before camProcess.join()")
            self.camProcess.join()
            print("after camProcess.join()")

    def BatchProcessing(self, b_num):
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())


if __name__ == "__main__":
    mc = ProcessPipeline()
    mc.BatchStreaming()

我使用了事件信号,如下所示。 这对我的申请来说更直接。

当批处理循环有足够的图像时,发出批处理信号。

#event_tut.py
import random, time
from threading import Event, Thread

event = Event()

def waiter(event, nloops):
    count=0
    while(count<10):
       print("%s. Waiting for the flag to be set." % (i+1))
       event.wait() # Blocks until the flag becomes true.
       print("Wait complete at:", time.ctime())
       event.clear() # Resets the flag.
       print('wait exit')
       count=count+1

def setter(event, nloops):
    for i in range(nloops):
       time.sleep(random.randrange(2, 5)) # Sleeps for some time.
       event.set()

threads = []
nloops = 10

threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()
threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()

for thread in threads:
    thread.join()

print("All done.")