Python try_decorator 重试 i2c 连接

Python try_decorator to retry i2c connections

我正在通过 I2C 总线与一些硬件通信。

每隔一段时间,由于总线繁忙会出现异常,所以我目前正在使用重试循环来处理这个问题,但是有很多代码重复,我想整理一下。

所以,我尝试写了一个装饰函数来传递各种函数给

例如,此命令初始化总线: i2c = busio.I2C(board.SCL, board.SDA)

这是我写的装饰器:

def try_decorator(func, name):
    MAX_TRIES = 5
    i = 1
    n = name
    while True:
        try:
            func()
        except Exception as e:
            print(f'retry {i} for {n}')
            i += 1
            if i == MAX_TRIES:
                print(f'max retries of {MAX_TRIES} reached for {n}, with error: {e}')
                break
            continue
        break

然后我尝试将上面提到的行作为部分传递给它: i2c = try_decorator(partial(busio.I2C, board.SCL, board.SDA), 'i2c init') 但是当我尝试将 i2c 传递给下一个函数时出现异常:'NoneType' object has no attribute 'try_lock' 我认为这意味着部分实际上没有正确初始化总线。我也尝试过作为 lambda 和函数,但得到了类似的结果。

我该怎么做才能解决这个问题并使我的代码尽可能简单明了?

此外,我尝试在装饰器中使用 func.name 作为 arg(以跟踪异常),而不是将字符串作为 name 传递函数而不是部分函数,​​但它给出了一个错误,似乎它正在尝试获取 I2C 对象的名称。

编辑:我已经按照 BigBro 对下面的回答更新了我的代码,但是在将 i2c 对象传递给我的下一个函数时我仍然遇到错误(如果我 运行 没有装饰器)。

def retry(f):
    MAX_TRIES = 5
    def wrapped(*args, **kwargs):
        for i in range(MAX_TRIES):
            try:
                res = f(*args, **kwargs)
                return res
            except IOError as e:
                print(f'retry {i} for {f.__name__}, with error: {e}')
        print(f'max retries of {MAX_TRIES} reached for {f.__name__}, with error: {e}')
    return wrapped

i2c = retry(busio.I2C(board.SCL, board.SDA))

drv = adafruit_drv2605.DRV2605(i2c)

'function' object has no attribute 'try_lock'

简单的答案是您忘记了 return try_decorator 中函数的结果(因此 return None)。将 func() 替换为 res = func(),然后在最后替换 return res

但是您的函数并不是真正的装饰器,因为它实际上 return 不是一个函数,并且不能用作 @try_decorator.

我建议如下:

import random

MAX_RETRIES = 10
def retry_decorator(f):
    def wrapped(*args, **kwargs):  # Allows arg to be passed to the function instead of using partial
        for i in range(MAX_RETRIES):
            try:
                res = f(*args, **kwargs)
                return res
            except Exception as e:
                print(f'Function {f.__name__} failed {i}/{MAX_RETRIES}: {e}')  # f.__name__ is what you're looking for
        print(f'Function {f.__name__} didn\'t work after {MAX_RETRIES} retries: {e}')
    return wrapped


@retry_decorator
def fail_randomly():
    i = random.randint(0, 10)
    if i < 7:
        raise RuntimeError('Random error')
    print('Sucess!')

fail_randomly()

输出

Function fail_randomly failed 0/10: Random error
Function fail_randomly failed 1/10: Random error
Function fail_randomly failed 2/10: Random error
Function fail_randomly failed 3/10: Random error
Function fail_randomly failed 4/10: Random error
Sucess!

最后一件事,我建议更具体地说明捕获的异常,否则事情可能会变得难以调试;)