线程池很慢,速度和串口一样

Thread pool is slow and same speed as serial

我正在尝试加快计算以进行广泛的实时对象检测并对其进行计算。

我将 OpenCV 与线程池和生产者、消费者一起用于视频捕获。但是执行速度和串口一样。

如何提高执行速度?

if __name__ == "__main__":
    video_name = '2016-11-18_07-30-01.h264'

    cap = cv2.VideoCapture(video_name)

    det = detector.CarDetector()
    car_tracker = Sort_Algorithm.Sort()
    ped_tracker = Sort_Algorithm.Sort()
    df_region, df_line = load_filter()
    region = Region(df_region)
    distance = compute_max_polygon_diagonal(df_region) * 0.1
    region_buffered = region.buffer(distance)

    threadn = cv2.getNumberOfCPUs()
    pool = ThreadPool(processes = 2)
    pending = deque()
    threaded_mode = True
    lock = threading.Lock()
    while True:
        while len(pending) > 0 and pending[0].ready():
            res = pending.popleft().get()
            cv2.imshow('video ', res)

        if  len(pending) < threadn:
            ret, frame = cap.read()
            if threaded_mode:
                t1 = time.time()
                H = [-2.01134074616, -16.6502442427, -1314.05715739, -3.35391526592, -22.3546973012, 2683.63584335,
                     -0.00130731963137, -0.0396207582264, 1]
                matrix = np.reshape(H, (3, 3))
                dst = cv2.warpPerspective(frame.copy(), matrix, (frame.shape[1], frame.shape[0]))
                task = pool.apply_async(pipeline, (lock, frame.copy(),car_tracker, ped_tracker,df_region,region_buffered, df_line, det, dst, matrix))

                cv2.imshow('dst', dst)
            else:
                task = DummyTask(pipeline,(lock, frame.copy(),car_tracker, ped_tracker,df_region, region_buffered, df_line, det, dst, matrix))

            pending.append(task)
        ch = cv2.waitKey(1)
        if ch == ord(' '):
            threaded_mode = not threaded_mode
        if ch == 27:
            break

管道代码:

def pipeline(lock, img, car_tracker, ped_tracker, df_region, region_buffered, df_line, det, dst, H):
    lock.acquire()
    global point_lists
    global df_car_lists
    global frame_idx
    global counter
    global  data_peds
    global  data_cars
    global  genera_data_pd_cars
    global  genera_data_pd_peds

    car_box, ped_box = det.get_localization(img)
    car_detections = car_tracker.update(np.array(car_box))
    ped_detections = ped_tracker.update(np.array(ped_box))
    saved_region = df_region.values
    saved_region = np.delete(saved_region, 2, 1)
    frame_idx+=1
    cv2.warpPerspective(np.array(df_line, dtype=np.float32), H, (df_line.shape[1], df_line.shape[0]))

    cv2.polylines(dst, np.int32([[saved_region]]), False, color=(255, 0, 0))
    cv2.polylines(dst, np.int32([np.array(df_line, dtype=np.float32)]), False, color=(255, 0, 0))

    for trk in car_detections:
        trk = trk.astype(np.int32)
        helpers.draw_box_label(img, trk, trk[4])  # Draw the bounding boxes on the

    for other in ped_detections:
            other = other.astype(np.int32)
            helpers.draw_box_label(img, other, other[4])  # Draw the bounding boxes on the

    for trk in car_detections:
        trk = trk.astype(np.int32)
        p = np.array([[((trk[1] + trk[3]) / 2,  (trk[0] + trk[2]) / 2)]], dtype=np.float32)
        center_pt = cv2.perspectiveTransform(p, H)
        ptx = center_pt.T.item(0)
        pty = center_pt.T.item(1)
        df_cars = compute(trk[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
        genera_data_pd_cars = genera_data_pd_cars.append(df_cars)
        for other in ped_detections:
            other = other.astype(np.int32)
            p = np.array([[((other[1] + other[3]) / 2, (other[0] + other[2]) / 2)]], dtype=np.float32)
            center_pt = cv2.perspectiveTransform(p, H)
            ptx = center_pt.T.item(0)
            pty = center_pt.T.item(1)
            df_peds = compute(other[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
            genera_data_pd_peds = genera_data_pd_cars.append(df_peds)
            query = "is_in_region == True and is_in_region_now == True"
            df_peds = genera_data_pd_peds.query(query)
            query = " is_in_region == True"
            df_cars = genera_data_pd_cars.query(query)
            if len(df_cars)> 1 and len(df_peds) > 1:

                df_car_in_t_range_ped = select_slice(df_cars, df_peds)
                df_ped_in_t_range_car = select_slice(df_peds, df_cars)
                t_abs_crossing_car = df_cars['t_abs_at_crossing'].iloc[0]
                t_abs_crossing_ped = df_peds['t_abs_at_crossing'].iloc[0]

                dt_crossing = t_abs_crossing_car - t_abs_crossing_ped

                is_able_to_pass_before_ped = \
                    ((df_car_in_t_range_ped['t_abs_at_crossing_estimated'] -
                      t_abs_crossing_ped) > 0).any()

                behavior = Behavior(  # is_passed_before_ped
                    dt_crossing < 0,
                    # is_able_to_stop
                    df_car_in_t_range_ped['is_able_to_halt'].any(),
                    # is_too_fast
                    df_car_in_t_range_ped['is_too_fast'].any(),
                    # is_close_enough
                    df_car_in_t_range_ped['is_close_enough'].any(),
                    # is_able_to_pass_before_ped
                    is_able_to_pass_before_ped)

                interaction = Interaction(trk[4], other[4])
                interaction = interaction.assess_behavior(behavior)
                code, res, msg = interaction.code, interaction.res, interaction.msg
                print(msg)
                genera_data_pd_cars = genera_data_pd_cars.iloc[0:0]
                genera_data_pd_peds = genera_data_pd_peds.iloc[0:0]
    lock.release()
    return img

线程在 cpython 中不是并行执行的。尝试改用 ProcessPoolExecutor。

python 中用于 CPU 绑定任务的多线程受 GIL 限制,有效地使单线程 运行 一次。

当然,如果您为 CPU 绑定任务启动多个线程,性能甚至会下降,因为内核和 python 解释器管理这些线程的开销很大。

内核想要调度这些线程,并且 python 想要限制这些线程 运行 同时发生,这会导致发生大量上下文切换,从而降低性能。

如果你在线程中只使用 numpy 那么你会很好,因为 numpy 不受 GIL 的影响,因为它使用原子操作,但我不确定如果 OpenCV 也是如此。

python 中的线程不适用于计算任务。

这是 python 线程的经典问题,考虑使用 multiprocessing 并且有很多关于这个主题的文章,您可能需要查看其中的一些。