Python 解释器 locked/freezing 试图 运行 pygetwindow 作为线程
Python interpreter locked/freezing while trying to run pygetwindow as a thread
我正在尝试学习如何使用线程,特别是 concurrent.futures.ThreadPoolExecutor
这是因为我需要 return 一个 numpy.array
来自我想要 运行 的函数同时
最终目标是让一个进程 运行 连接应用程序的视频循环,而另一个进程执行对象检测和 GUI 交互。 concurrent.futures library 中的 result()
关键字允许我这样做。
问题是我的代码 运行s 一次,然后似乎锁定了。我实际上不确定会发生什么,因为当我在调试器中单步执行它时 运行 一次,然后调试器变为空白,我实际上无法单步执行并且没有抛出任何错误。
代码出现锁定行:notepadWindow = pygetwindow.getWindowsWithTitle('Notepad')[0]
我正好得到一个循环,print
语句在循环重新启动后打印,然后在 pygetwindow
处停止
我对 GIL 了解不多,但我尝试在 ThreadPoolExecutor()
上使用 max_workers=1
参数,这两种方式都没有什么不同,我的印象是 concurrent.futures
允许我绕过锁。
我如何 运行 videoLoop
作为单个线程确保每次迭代 return DetectionWindow
?
import cv2 as cv
import numpy as np
import concurrent.futures
from PIL import ImageGrab
import pygetwindow
def videoLoop():
notepadWindow = pygetwindow.getWindowsWithTitle('Notepad')[0]
x1 = notepadWindow.left
y1 = notepadWindow.top
height = notepadWindow.height
width = notepadWindow.width
x2 = x1 + width
y2 = y1 + height
haystack_img = ImageGrab.grab(bbox=(x1, y1, x2, y2))
haystack_img_np = np.array(haystack_img)
DetectionWindow= cv.cvtColor(haystack_img_np, cv.COLOR_BGR2GRAY)
return DetectionWindow
def f1():
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(videoLoop)
notepadWindow = f1.result()
cv.imshow("Video Loop", notepadWindow)
cv.waitKey(1)
print(f1.result())
while True:
f1()
如果您想要连续的帧流,ThreadPoolExecutor 在这里帮不了您太多。
这是对您的代码的修改,它使用常规的旧 threading.Thread
并将帧(及其捕获时间戳,因为这是异步的)放入 queue.Queue
然后您可以在另一个(或主线程。
线程有一个无限循环,可以通过设置线程的 exit_signal
.
来停止。
(我没有测试这个,因为我目前在Mac,所以可能有错别字或其他问题。)
import queue
import time
import cv2 as cv
import numpy as np
import threading
from PIL import ImageGrab
import pygetwindow
def do_capture():
notepadWindow = pygetwindow.getWindowsWithTitle("Notepad")[0]
x1 = notepadWindow.left
y1 = notepadWindow.top
height = notepadWindow.height
width = notepadWindow.width
x2 = x1 + width
y2 = y1 + height
haystack_img = ImageGrab.grab(bbox=(x1, y1, x2, y2))
return cv.cvtColor(np.array(haystack_img), cv.COLOR_BGR2GRAY)
class VideoCaptureThread(threading.Thread):
def __init__(self, result_queue: queue.Queue) -> None:
super().__init__()
self.exit_signal = threading.Event()
self.result_queue = result_queue
def run(self) -> None:
while not self.exit_signal.wait(0.05):
try:
result = do_capture()
self.result_queue.put((time.time(), result))
except Exception as exc:
print(f"Failed capture: {exc}")
def process_frames(result_queue: queue.Queue):
start_time = time.time()
while time.time() - start_time < 5: # Run for five seconds
frame = result_queue.get()
print(frame)
def main():
result_queue = queue.Queue()
thread = VideoCaptureThread(result_queue=result_queue)
thread.start()
process_frames(result_queue)
thread.exit_signal.set()
thread.join()
if __name__ == "__main__":
main()
我正在尝试学习如何使用线程,特别是 concurrent.futures.ThreadPoolExecutor
这是因为我需要 return 一个 numpy.array
来自我想要 运行 的函数同时
最终目标是让一个进程 运行 连接应用程序的视频循环,而另一个进程执行对象检测和 GUI 交互。 concurrent.futures library 中的 result()
关键字允许我这样做。
问题是我的代码 运行s 一次,然后似乎锁定了。我实际上不确定会发生什么,因为当我在调试器中单步执行它时 运行 一次,然后调试器变为空白,我实际上无法单步执行并且没有抛出任何错误。
代码出现锁定行:notepadWindow = pygetwindow.getWindowsWithTitle('Notepad')[0]
我正好得到一个循环,print
语句在循环重新启动后打印,然后在 pygetwindow
我对 GIL 了解不多,但我尝试在 ThreadPoolExecutor()
上使用 max_workers=1
参数,这两种方式都没有什么不同,我的印象是 concurrent.futures
允许我绕过锁。
我如何 运行 videoLoop
作为单个线程确保每次迭代 return DetectionWindow
?
import cv2 as cv
import numpy as np
import concurrent.futures
from PIL import ImageGrab
import pygetwindow
def videoLoop():
notepadWindow = pygetwindow.getWindowsWithTitle('Notepad')[0]
x1 = notepadWindow.left
y1 = notepadWindow.top
height = notepadWindow.height
width = notepadWindow.width
x2 = x1 + width
y2 = y1 + height
haystack_img = ImageGrab.grab(bbox=(x1, y1, x2, y2))
haystack_img_np = np.array(haystack_img)
DetectionWindow= cv.cvtColor(haystack_img_np, cv.COLOR_BGR2GRAY)
return DetectionWindow
def f1():
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(videoLoop)
notepadWindow = f1.result()
cv.imshow("Video Loop", notepadWindow)
cv.waitKey(1)
print(f1.result())
while True:
f1()
如果您想要连续的帧流,ThreadPoolExecutor 在这里帮不了您太多。
这是对您的代码的修改,它使用常规的旧 threading.Thread
并将帧(及其捕获时间戳,因为这是异步的)放入 queue.Queue
然后您可以在另一个(或主线程。
线程有一个无限循环,可以通过设置线程的 exit_signal
.
(我没有测试这个,因为我目前在Mac,所以可能有错别字或其他问题。)
import queue
import time
import cv2 as cv
import numpy as np
import threading
from PIL import ImageGrab
import pygetwindow
def do_capture():
notepadWindow = pygetwindow.getWindowsWithTitle("Notepad")[0]
x1 = notepadWindow.left
y1 = notepadWindow.top
height = notepadWindow.height
width = notepadWindow.width
x2 = x1 + width
y2 = y1 + height
haystack_img = ImageGrab.grab(bbox=(x1, y1, x2, y2))
return cv.cvtColor(np.array(haystack_img), cv.COLOR_BGR2GRAY)
class VideoCaptureThread(threading.Thread):
def __init__(self, result_queue: queue.Queue) -> None:
super().__init__()
self.exit_signal = threading.Event()
self.result_queue = result_queue
def run(self) -> None:
while not self.exit_signal.wait(0.05):
try:
result = do_capture()
self.result_queue.put((time.time(), result))
except Exception as exc:
print(f"Failed capture: {exc}")
def process_frames(result_queue: queue.Queue):
start_time = time.time()
while time.time() - start_time < 5: # Run for five seconds
frame = result_queue.get()
print(frame)
def main():
result_queue = queue.Queue()
thread = VideoCaptureThread(result_queue=result_queue)
thread.start()
process_frames(result_queue)
thread.exit_signal.set()
thread.join()
if __name__ == "__main__":
main()