线程池很慢,速度和串口一样
Thread pool is slow and same speed as serial
我正在尝试加快计算以进行广泛的实时对象检测并对其进行计算。
我将 OpenCV 与线程池和生产者、消费者一起用于视频捕获。但是执行速度和串口一样。
如何提高执行速度?
if __name__ == "__main__":
video_name = '2016-11-18_07-30-01.h264'
cap = cv2.VideoCapture(video_name)
det = detector.CarDetector()
car_tracker = Sort_Algorithm.Sort()
ped_tracker = Sort_Algorithm.Sort()
df_region, df_line = load_filter()
region = Region(df_region)
distance = compute_max_polygon_diagonal(df_region) * 0.1
region_buffered = region.buffer(distance)
threadn = cv2.getNumberOfCPUs()
pool = ThreadPool(processes = 2)
pending = deque()
threaded_mode = True
lock = threading.Lock()
while True:
while len(pending) > 0 and pending[0].ready():
res = pending.popleft().get()
cv2.imshow('video ', res)
if len(pending) < threadn:
ret, frame = cap.read()
if threaded_mode:
t1 = time.time()
H = [-2.01134074616, -16.6502442427, -1314.05715739, -3.35391526592, -22.3546973012, 2683.63584335,
-0.00130731963137, -0.0396207582264, 1]
matrix = np.reshape(H, (3, 3))
dst = cv2.warpPerspective(frame.copy(), matrix, (frame.shape[1], frame.shape[0]))
task = pool.apply_async(pipeline, (lock, frame.copy(),car_tracker, ped_tracker,df_region,region_buffered, df_line, det, dst, matrix))
cv2.imshow('dst', dst)
else:
task = DummyTask(pipeline,(lock, frame.copy(),car_tracker, ped_tracker,df_region, region_buffered, df_line, det, dst, matrix))
pending.append(task)
ch = cv2.waitKey(1)
if ch == ord(' '):
threaded_mode = not threaded_mode
if ch == 27:
break
管道代码:
def pipeline(lock, img, car_tracker, ped_tracker, df_region, region_buffered, df_line, det, dst, H):
lock.acquire()
global point_lists
global df_car_lists
global frame_idx
global counter
global data_peds
global data_cars
global genera_data_pd_cars
global genera_data_pd_peds
car_box, ped_box = det.get_localization(img)
car_detections = car_tracker.update(np.array(car_box))
ped_detections = ped_tracker.update(np.array(ped_box))
saved_region = df_region.values
saved_region = np.delete(saved_region, 2, 1)
frame_idx+=1
cv2.warpPerspective(np.array(df_line, dtype=np.float32), H, (df_line.shape[1], df_line.shape[0]))
cv2.polylines(dst, np.int32([[saved_region]]), False, color=(255, 0, 0))
cv2.polylines(dst, np.int32([np.array(df_line, dtype=np.float32)]), False, color=(255, 0, 0))
for trk in car_detections:
trk = trk.astype(np.int32)
helpers.draw_box_label(img, trk, trk[4]) # Draw the bounding boxes on the
for other in ped_detections:
other = other.astype(np.int32)
helpers.draw_box_label(img, other, other[4]) # Draw the bounding boxes on the
for trk in car_detections:
trk = trk.astype(np.int32)
p = np.array([[((trk[1] + trk[3]) / 2, (trk[0] + trk[2]) / 2)]], dtype=np.float32)
center_pt = cv2.perspectiveTransform(p, H)
ptx = center_pt.T.item(0)
pty = center_pt.T.item(1)
df_cars = compute(trk[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
genera_data_pd_cars = genera_data_pd_cars.append(df_cars)
for other in ped_detections:
other = other.astype(np.int32)
p = np.array([[((other[1] + other[3]) / 2, (other[0] + other[2]) / 2)]], dtype=np.float32)
center_pt = cv2.perspectiveTransform(p, H)
ptx = center_pt.T.item(0)
pty = center_pt.T.item(1)
df_peds = compute(other[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
genera_data_pd_peds = genera_data_pd_cars.append(df_peds)
query = "is_in_region == True and is_in_region_now == True"
df_peds = genera_data_pd_peds.query(query)
query = " is_in_region == True"
df_cars = genera_data_pd_cars.query(query)
if len(df_cars)> 1 and len(df_peds) > 1:
df_car_in_t_range_ped = select_slice(df_cars, df_peds)
df_ped_in_t_range_car = select_slice(df_peds, df_cars)
t_abs_crossing_car = df_cars['t_abs_at_crossing'].iloc[0]
t_abs_crossing_ped = df_peds['t_abs_at_crossing'].iloc[0]
dt_crossing = t_abs_crossing_car - t_abs_crossing_ped
is_able_to_pass_before_ped = \
((df_car_in_t_range_ped['t_abs_at_crossing_estimated'] -
t_abs_crossing_ped) > 0).any()
behavior = Behavior( # is_passed_before_ped
dt_crossing < 0,
# is_able_to_stop
df_car_in_t_range_ped['is_able_to_halt'].any(),
# is_too_fast
df_car_in_t_range_ped['is_too_fast'].any(),
# is_close_enough
df_car_in_t_range_ped['is_close_enough'].any(),
# is_able_to_pass_before_ped
is_able_to_pass_before_ped)
interaction = Interaction(trk[4], other[4])
interaction = interaction.assess_behavior(behavior)
code, res, msg = interaction.code, interaction.res, interaction.msg
print(msg)
genera_data_pd_cars = genera_data_pd_cars.iloc[0:0]
genera_data_pd_peds = genera_data_pd_peds.iloc[0:0]
lock.release()
return img
线程在 cpython 中不是并行执行的。尝试改用 ProcessPoolExecutor。
python 中用于 CPU 绑定任务的多线程受 GIL 限制,有效地使单线程 运行 一次。
当然,如果您为 CPU 绑定任务启动多个线程,性能甚至会下降,因为内核和 python 解释器管理这些线程的开销很大。
内核想要调度这些线程,并且 python 想要限制这些线程 运行 同时发生,这会导致发生大量上下文切换,从而降低性能。
如果你在线程中只使用 numpy
那么你会很好,因为 numpy
不受 GIL
的影响,因为它使用原子操作,但我不确定如果 OpenCV
也是如此。
python 中的线程不适用于计算任务。
这是 python 线程的经典问题,考虑使用 multiprocessing
并且有很多关于这个主题的文章,您可能需要查看其中的一些。
我正在尝试加快计算以进行广泛的实时对象检测并对其进行计算。
我将 OpenCV 与线程池和生产者、消费者一起用于视频捕获。但是执行速度和串口一样。
如何提高执行速度?
if __name__ == "__main__":
video_name = '2016-11-18_07-30-01.h264'
cap = cv2.VideoCapture(video_name)
det = detector.CarDetector()
car_tracker = Sort_Algorithm.Sort()
ped_tracker = Sort_Algorithm.Sort()
df_region, df_line = load_filter()
region = Region(df_region)
distance = compute_max_polygon_diagonal(df_region) * 0.1
region_buffered = region.buffer(distance)
threadn = cv2.getNumberOfCPUs()
pool = ThreadPool(processes = 2)
pending = deque()
threaded_mode = True
lock = threading.Lock()
while True:
while len(pending) > 0 and pending[0].ready():
res = pending.popleft().get()
cv2.imshow('video ', res)
if len(pending) < threadn:
ret, frame = cap.read()
if threaded_mode:
t1 = time.time()
H = [-2.01134074616, -16.6502442427, -1314.05715739, -3.35391526592, -22.3546973012, 2683.63584335,
-0.00130731963137, -0.0396207582264, 1]
matrix = np.reshape(H, (3, 3))
dst = cv2.warpPerspective(frame.copy(), matrix, (frame.shape[1], frame.shape[0]))
task = pool.apply_async(pipeline, (lock, frame.copy(),car_tracker, ped_tracker,df_region,region_buffered, df_line, det, dst, matrix))
cv2.imshow('dst', dst)
else:
task = DummyTask(pipeline,(lock, frame.copy(),car_tracker, ped_tracker,df_region, region_buffered, df_line, det, dst, matrix))
pending.append(task)
ch = cv2.waitKey(1)
if ch == ord(' '):
threaded_mode = not threaded_mode
if ch == 27:
break
管道代码:
def pipeline(lock, img, car_tracker, ped_tracker, df_region, region_buffered, df_line, det, dst, H):
lock.acquire()
global point_lists
global df_car_lists
global frame_idx
global counter
global data_peds
global data_cars
global genera_data_pd_cars
global genera_data_pd_peds
car_box, ped_box = det.get_localization(img)
car_detections = car_tracker.update(np.array(car_box))
ped_detections = ped_tracker.update(np.array(ped_box))
saved_region = df_region.values
saved_region = np.delete(saved_region, 2, 1)
frame_idx+=1
cv2.warpPerspective(np.array(df_line, dtype=np.float32), H, (df_line.shape[1], df_line.shape[0]))
cv2.polylines(dst, np.int32([[saved_region]]), False, color=(255, 0, 0))
cv2.polylines(dst, np.int32([np.array(df_line, dtype=np.float32)]), False, color=(255, 0, 0))
for trk in car_detections:
trk = trk.astype(np.int32)
helpers.draw_box_label(img, trk, trk[4]) # Draw the bounding boxes on the
for other in ped_detections:
other = other.astype(np.int32)
helpers.draw_box_label(img, other, other[4]) # Draw the bounding boxes on the
for trk in car_detections:
trk = trk.astype(np.int32)
p = np.array([[((trk[1] + trk[3]) / 2, (trk[0] + trk[2]) / 2)]], dtype=np.float32)
center_pt = cv2.perspectiveTransform(p, H)
ptx = center_pt.T.item(0)
pty = center_pt.T.item(1)
df_cars = compute(trk[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
genera_data_pd_cars = genera_data_pd_cars.append(df_cars)
for other in ped_detections:
other = other.astype(np.int32)
p = np.array([[((other[1] + other[3]) / 2, (other[0] + other[2]) / 2)]], dtype=np.float32)
center_pt = cv2.perspectiveTransform(p, H)
ptx = center_pt.T.item(0)
pty = center_pt.T.item(1)
df_peds = compute(other[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
genera_data_pd_peds = genera_data_pd_cars.append(df_peds)
query = "is_in_region == True and is_in_region_now == True"
df_peds = genera_data_pd_peds.query(query)
query = " is_in_region == True"
df_cars = genera_data_pd_cars.query(query)
if len(df_cars)> 1 and len(df_peds) > 1:
df_car_in_t_range_ped = select_slice(df_cars, df_peds)
df_ped_in_t_range_car = select_slice(df_peds, df_cars)
t_abs_crossing_car = df_cars['t_abs_at_crossing'].iloc[0]
t_abs_crossing_ped = df_peds['t_abs_at_crossing'].iloc[0]
dt_crossing = t_abs_crossing_car - t_abs_crossing_ped
is_able_to_pass_before_ped = \
((df_car_in_t_range_ped['t_abs_at_crossing_estimated'] -
t_abs_crossing_ped) > 0).any()
behavior = Behavior( # is_passed_before_ped
dt_crossing < 0,
# is_able_to_stop
df_car_in_t_range_ped['is_able_to_halt'].any(),
# is_too_fast
df_car_in_t_range_ped['is_too_fast'].any(),
# is_close_enough
df_car_in_t_range_ped['is_close_enough'].any(),
# is_able_to_pass_before_ped
is_able_to_pass_before_ped)
interaction = Interaction(trk[4], other[4])
interaction = interaction.assess_behavior(behavior)
code, res, msg = interaction.code, interaction.res, interaction.msg
print(msg)
genera_data_pd_cars = genera_data_pd_cars.iloc[0:0]
genera_data_pd_peds = genera_data_pd_peds.iloc[0:0]
lock.release()
return img
线程在 cpython 中不是并行执行的。尝试改用 ProcessPoolExecutor。
python 中用于 CPU 绑定任务的多线程受 GIL 限制,有效地使单线程 运行 一次。
当然,如果您为 CPU 绑定任务启动多个线程,性能甚至会下降,因为内核和 python 解释器管理这些线程的开销很大。
内核想要调度这些线程,并且 python 想要限制这些线程 运行 同时发生,这会导致发生大量上下文切换,从而降低性能。
如果你在线程中只使用 numpy
那么你会很好,因为 numpy
不受 GIL
的影响,因为它使用原子操作,但我不确定如果 OpenCV
也是如此。
python 中的线程不适用于计算任务。
这是 python 线程的经典问题,考虑使用 multiprocessing
并且有很多关于这个主题的文章,您可能需要查看其中的一些。