是否可以指定等待代码到 运行 的最长时间?
Is it possible to specify the max amount of time to wait for code to run?
我有一段代码需要从中获取输出:
gps = get_gps_data()
虽然如果从get_gps_data()
获取输出的时间太长,我想取消这个过程并将gps
设置为None
。修改函数是不可能的,所以有没有办法指定等待某些代码的最大时间 运行 并在到达该时间时中止,比如 5 秒?
您可以执行如下操作。这将适用于您想要检查并在一段时间后停止的任何逻辑或功能。您要检查和取消的功能在单独的线程中执行并受到监视。等待3秒后取消
只需将您的代码放在 test
函数中(您可以重命名函数)。 test
函数尝试休眠 100 秒(可以是任何逻辑)。但是主调用 future.result(3)
,只等待 3 秒,否则会抛出 TimeoutError 并遵循一些其他逻辑。
import time
import concurrent.futures as futures
def test():
print('Sleeping for 100 seconds')
time.sleep(100)
# Add your code here
# gps = get_gps_data()
return 'Done'
# Main
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test)
try:
resp = future.result(3)
except futures.TimeoutError:
print('Some other resp')
else:
print(resp)
executor._threads.clear()
futures.thread._threads_queues.clear()
您可以尝试将 test
函数内的等待时间最小化到小于 3 秒的某个值。在这种情况下,将使用从 test
函数返回的原始结果。
我有一段代码需要从中获取输出:
gps = get_gps_data()
虽然如果从get_gps_data()
获取输出的时间太长,我想取消这个过程并将gps
设置为None
。修改函数是不可能的,所以有没有办法指定等待某些代码的最大时间 运行 并在到达该时间时中止,比如 5 秒?
您可以执行如下操作。这将适用于您想要检查并在一段时间后停止的任何逻辑或功能。您要检查和取消的功能在单独的线程中执行并受到监视。等待3秒后取消
只需将您的代码放在 test
函数中(您可以重命名函数)。 test
函数尝试休眠 100 秒(可以是任何逻辑)。但是主调用 future.result(3)
,只等待 3 秒,否则会抛出 TimeoutError 并遵循一些其他逻辑。
import time
import concurrent.futures as futures
def test():
print('Sleeping for 100 seconds')
time.sleep(100)
# Add your code here
# gps = get_gps_data()
return 'Done'
# Main
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test)
try:
resp = future.result(3)
except futures.TimeoutError:
print('Some other resp')
else:
print(resp)
executor._threads.clear()
futures.thread._threads_queues.clear()
您可以尝试将 test
函数内的等待时间最小化到小于 3 秒的某个值。在这种情况下,将使用从 test
函数返回的原始结果。