如何创建一个可以包装协程或函数的 Python 装饰器?

How to create a Python decorator that can wrap either coroutine or function?

我正在尝试制作一个装饰器来包装协程或函数。

我尝试的第一件事是包装器中的简单重复代码:

def duration(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_ts = time.time()
        result = func(*args, **kwargs)
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))
        return result

    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        start_ts = time.time()
        result = await func(*args, **kwargs)
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))
        return result

    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    else:
        return wrapper

这可行,但我想避免代码重复,因为这并不比编写两个单独的装饰器好多少。

然后我尝试使用 class:

制作装饰器
class SyncAsyncDuration:
    def __init__(self):
        self.start_ts = None

    def __call__(self, func):
        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            self.setup(func, args, kwargs)
            result = func(*args, **kwargs)
            self.teardown(func, args, kwargs)
            return result

        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            self.setup(func, args, kwargs)
            result = await func(*args, **kwargs)
            self.teardown(func, args, kwargs)
            return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper

    def setup(self, func, args, kwargs):
        self.start_ts = time.time()

    def teardown(self, func, args, kwargs):
        dur = time.time() - self.start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))

这在某些情况下对我来说效果很好,但在这个解决方案中我不能在 withtry 中放置一个函数声明。 有什么方法可以在不重复代码的情况下创建装饰器吗?

也许您可以找到更好的方法,但是,例如,您可以将包装逻辑移至某个上下文管理器以防止代码重复:

import asyncio
import functools
import time
from contextlib import contextmanager


def duration(func):
    @contextmanager
    def wrapping_logic():
        start_ts = time.time()
        yield
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        if not asyncio.iscoroutinefunction(func):
            with wrapping_logic():
                return func(*args, **kwargs)
        else:
            async def tmp():
                with wrapping_logic():
                    return (await func(*args, **kwargs))
            return tmp()
    return wrapper

对我来说,@mikhail-gerasimov 接受的答案不适用于异步 FastAPI 方法(尽管它确实适用于 FastAPI 之外的普通函数和协程函数)。但是,我在 github 上找到了 this 示例,它确实可以使用 fastapi 方法。改编(略微)如下:

def duration(func):

    async def helper(func, *args, **kwargs):
        if asyncio.iscoroutinefunction(func):
            print(f"this function is a coroutine: {func.__name__}")
            return await func(*args, **kwargs)
        else:
            print(f"not a coroutine: {func.__name__}")
            return func(*args, **kwargs)

    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start_ts = time.time()
        result = await helper(func, *args, **kwargs)
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))

        return result

    return wrapper

或者,如果您想保留上下文管理器,您也可以这样做:

def duration(func):
    """ decorator that can take either coroutine or normal function """
    @contextmanager
    def wrapping_logic():
        start_ts = time.time()
        yield
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))

    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        if not asyncio.iscoroutinefunction(func):
            with wrapping_logic():
                return func(*args, **kwargs)
        else:
            with wrapping_logic():
                return (await func(*args, **kwargs))
    return wrapper

这个和公认的答案差别不大。主要是我们只需要创建一个异步包装器并等待函数,如果函数是协程的话。

在我的测试中,此示例代码适用于装饰函数中的 try/except 块以及 with 语句。

我仍然不清楚为什么包装器需要异步 FastAPI 方法。

与 Anatoly 一致,此解决方案将以前的答案放在一起并确保保留 func 的原始类型(如果同步保持装饰函数同步,如果异步保持异步):

import time
import asyncio
from contextlib import contextmanager
import functools

def decorate_sync_async(decorating_context, func):
    if asyncio.iscoroutinefunction(func):
        async def decorated(*args, **kwargs):
            with decorating_context():
                return (await func(*args, **kwargs))
    else:
        def decorated(*args, **kwargs):
            with decorating_context():
                return func(*args, **kwargs)

    return functools.wraps(func)(decorated)

@contextmanager
def wrapping_logic(func_name):
    start_ts = time.time()
    yield
    dur = time.time() - start_ts
    print('{} took {:.2} seconds'.format(func_name, dur))


def duration(func):
    timing_context = lambda: wrapping_logic(func.__name__)
    return decorate_sync_async( timing_context, func )

decorate_sync_async 现在可以与任何包装逻辑 (contextmanager) 重用,以创建适用于同步和异步功能的装饰器。

要使用它(并检查它):

@duration
def sync_hello():
    print('sync_hello')

@duration
async def async_hello():
    await asyncio.sleep(0.1)
    print('async_hello')

async def main():
    print(f"is {sync_hello.__name__} async? "
        f"{asyncio.iscoroutinefunction(sync_hello)}") # False
    sync_hello()

    print(f"is {async_hello.__name__} async? "
        f"{asyncio.iscoroutinefunction(async_hello)}") # True
    await async_hello()


if __name__ == '__main__':
    asyncio.run(main())

输出:

sync_hello async? False
sync_hello
sync_hello took 0.0 seconds
is async_hello async? True
async_hello
async_hello took 0.1 seconds