Python try_decorator 重试 i2c 连接
Python try_decorator to retry i2c connections
我正在通过 I2C 总线与一些硬件通信。
每隔一段时间,由于总线繁忙会出现异常,所以我目前正在使用重试循环来处理这个问题,但是有很多代码重复,我想整理一下。
所以,我尝试写了一个装饰函数来传递各种函数给
例如,此命令初始化总线:
i2c = busio.I2C(board.SCL, board.SDA)
这是我写的装饰器:
def try_decorator(func, name):
MAX_TRIES = 5
i = 1
n = name
while True:
try:
func()
except Exception as e:
print(f'retry {i} for {n}')
i += 1
if i == MAX_TRIES:
print(f'max retries of {MAX_TRIES} reached for {n}, with error: {e}')
break
continue
break
然后我尝试将上面提到的行作为部分传递给它:
i2c = try_decorator(partial(busio.I2C, board.SCL, board.SDA), 'i2c init')
但是当我尝试将 i2c 传递给下一个函数时出现异常:'NoneType' object has no attribute 'try_lock'
我认为这意味着部分实际上没有正确初始化总线。我也尝试过作为 lambda 和函数,但得到了类似的结果。
我该怎么做才能解决这个问题并使我的代码尽可能简单明了?
此外,我尝试在装饰器中使用 func.name 作为 arg(以跟踪异常),而不是将字符串作为 name
传递函数而不是部分函数,但它给出了一个错误,似乎它正在尝试获取 I2C 对象的名称。
编辑:我已经按照 BigBro 对下面的回答更新了我的代码,但是在将 i2c 对象传递给我的下一个函数时我仍然遇到错误(如果我 运行 没有装饰器)。
def retry(f):
MAX_TRIES = 5
def wrapped(*args, **kwargs):
for i in range(MAX_TRIES):
try:
res = f(*args, **kwargs)
return res
except IOError as e:
print(f'retry {i} for {f.__name__}, with error: {e}')
print(f'max retries of {MAX_TRIES} reached for {f.__name__}, with error: {e}')
return wrapped
i2c = retry(busio.I2C(board.SCL, board.SDA))
drv = adafruit_drv2605.DRV2605(i2c)
'function' object has no attribute 'try_lock'
简单的答案是您忘记了 return try_decorator
中函数的结果(因此 return None
)。将 func()
替换为 res = func()
,然后在最后替换 return res
。
但是您的函数并不是真正的装饰器,因为它实际上 return 不是一个函数,并且不能用作 @try_decorator
.
我建议如下:
import random
MAX_RETRIES = 10
def retry_decorator(f):
def wrapped(*args, **kwargs): # Allows arg to be passed to the function instead of using partial
for i in range(MAX_RETRIES):
try:
res = f(*args, **kwargs)
return res
except Exception as e:
print(f'Function {f.__name__} failed {i}/{MAX_RETRIES}: {e}') # f.__name__ is what you're looking for
print(f'Function {f.__name__} didn\'t work after {MAX_RETRIES} retries: {e}')
return wrapped
@retry_decorator
def fail_randomly():
i = random.randint(0, 10)
if i < 7:
raise RuntimeError('Random error')
print('Sucess!')
fail_randomly()
输出
Function fail_randomly failed 0/10: Random error
Function fail_randomly failed 1/10: Random error
Function fail_randomly failed 2/10: Random error
Function fail_randomly failed 3/10: Random error
Function fail_randomly failed 4/10: Random error
Sucess!
最后一件事,我建议更具体地说明捕获的异常,否则事情可能会变得难以调试;)
我正在通过 I2C 总线与一些硬件通信。
每隔一段时间,由于总线繁忙会出现异常,所以我目前正在使用重试循环来处理这个问题,但是有很多代码重复,我想整理一下。
所以,我尝试写了一个装饰函数来传递各种函数给
例如,此命令初始化总线:
i2c = busio.I2C(board.SCL, board.SDA)
这是我写的装饰器:
def try_decorator(func, name):
MAX_TRIES = 5
i = 1
n = name
while True:
try:
func()
except Exception as e:
print(f'retry {i} for {n}')
i += 1
if i == MAX_TRIES:
print(f'max retries of {MAX_TRIES} reached for {n}, with error: {e}')
break
continue
break
然后我尝试将上面提到的行作为部分传递给它:
i2c = try_decorator(partial(busio.I2C, board.SCL, board.SDA), 'i2c init')
但是当我尝试将 i2c 传递给下一个函数时出现异常:'NoneType' object has no attribute 'try_lock'
我认为这意味着部分实际上没有正确初始化总线。我也尝试过作为 lambda 和函数,但得到了类似的结果。
我该怎么做才能解决这个问题并使我的代码尽可能简单明了?
此外,我尝试在装饰器中使用 func.name 作为 arg(以跟踪异常),而不是将字符串作为 name
传递函数而不是部分函数,但它给出了一个错误,似乎它正在尝试获取 I2C 对象的名称。
编辑:我已经按照 BigBro 对下面的回答更新了我的代码,但是在将 i2c 对象传递给我的下一个函数时我仍然遇到错误(如果我 运行 没有装饰器)。
def retry(f):
MAX_TRIES = 5
def wrapped(*args, **kwargs):
for i in range(MAX_TRIES):
try:
res = f(*args, **kwargs)
return res
except IOError as e:
print(f'retry {i} for {f.__name__}, with error: {e}')
print(f'max retries of {MAX_TRIES} reached for {f.__name__}, with error: {e}')
return wrapped
i2c = retry(busio.I2C(board.SCL, board.SDA))
drv = adafruit_drv2605.DRV2605(i2c)
'function' object has no attribute 'try_lock'
简单的答案是您忘记了 return try_decorator
中函数的结果(因此 return None
)。将 func()
替换为 res = func()
,然后在最后替换 return res
。
但是您的函数并不是真正的装饰器,因为它实际上 return 不是一个函数,并且不能用作 @try_decorator
.
我建议如下:
import random
MAX_RETRIES = 10
def retry_decorator(f):
def wrapped(*args, **kwargs): # Allows arg to be passed to the function instead of using partial
for i in range(MAX_RETRIES):
try:
res = f(*args, **kwargs)
return res
except Exception as e:
print(f'Function {f.__name__} failed {i}/{MAX_RETRIES}: {e}') # f.__name__ is what you're looking for
print(f'Function {f.__name__} didn\'t work after {MAX_RETRIES} retries: {e}')
return wrapped
@retry_decorator
def fail_randomly():
i = random.randint(0, 10)
if i < 7:
raise RuntimeError('Random error')
print('Sucess!')
fail_randomly()
输出
Function fail_randomly failed 0/10: Random error
Function fail_randomly failed 1/10: Random error
Function fail_randomly failed 2/10: Random error
Function fail_randomly failed 3/10: Random error
Function fail_randomly failed 4/10: Random error
Sucess!
最后一件事,我建议更具体地说明捕获的异常,否则事情可能会变得难以调试;)