Python 中最简单的 async/await 示例

Simplest async/await example possible in Python

我阅读了很多示例、博客文章,questions/answers 关于 asyncio / async / await Python 3.5+,很多都很复杂,我找到的最简单的可能是 .
它仍然使用 ensure_future,为了学习 Python 中的异步编程,我想看一个更小的例子,以及 必要的最小工具是什么 做一个基本的异步/等待示例。

问题:是否可以给出一个 简单示例来说明 async / await 是如何工作的,只使用这两个关键字 + 代码 运行 异步循环 + 其他 Python 代码但没有其他 asyncio 函数?

示例:像这样:

import asyncio

async def async_foo():
    print("async_foo started")
    await asyncio.sleep(5)
    print("async_foo done")

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()
    print('Do some actions 1')
    await asyncio.sleep(5)
    print('Do some actions 2')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

但没有 ensure_future,并且仍然演示 await / async 是如何工作的。

is it possible to give a simple example showing how async / await works, by using only these two keywords + asyncio.get_event_loop() + run_until_complete + other Python code but no other asyncio functions?

这样就可以编写有效的代码:

import asyncio


async def main():
    print('done!')


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

但是这样就无法证明为什么需要 asyncio。

顺便问一下,为什么需要 asyncio,而不仅仅是纯代码?答案是 - asyncio 允许您在并行化 I/O 阻塞操作(如 reading/writing 到网络)时获得性能优势。要编写有用的示例,您需要使用这些操作的异步实现。

请阅读以获得更详细的解释。

更新:

好的,下面是使用 asyncio.sleep 模仿 I/O 阻塞操作和 asyncio.gather 的示例,它展示了如何 运行 同时进行多个阻塞操作:

import asyncio


async def io_related(name):
    print(f'{name} started')
    await asyncio.sleep(1)
    print(f'{name} finished')


async def main():
    await asyncio.gather(
        io_related('first'),
        io_related('second'),
    )  # 1s + 1s = over 1s


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

first started
second started
first finished
second finished
[Finished in 1.2s]

请注意 io_related 是如何开始的,然后仅一秒钟后,两者都完成了。

为了回答您的问题,我将针对同一问题提供 3 种不同的解决方案。

情况一:正常Python

import time

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()
tasks = [
    sum("A", [1, 2]),
    sum("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6

Time: 5.02 sec

案例 2:async/await 做错了

import asyncio
import time

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6

Time: 5.01 sec

案例 3:async/await 做对了

除了 sleep 函数外与案例 2 相同:

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

输出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.00
Task B: Sum = 6

Time: 3.01 sec

案例 1 和案例 2 给出相同的 5 秒,而案例 3 仅 3 秒。所以async/await做得对更快。

差异的原因在于 sleep 函数的实现。

# case 1
def sleep():
    ...
    time.sleep(1)

# case 2
async def sleep():
    ...
    time.sleep(1)

# case 3
async def sleep():
    ...
    await asyncio.sleep(1)

情况一和情况二是“一样的”: 他们在不允许其他人使用资源的情况下“睡觉”。 而在情况 3 中,它允许在睡眠时访问资源。

在案例2中,我们在普通函数中添加了async。然而,事件循环将 运行 它 而不会中断 。 为什么?因为我们没有说明允许循环在哪里中断您的函数以 运行 另一个任务。

在案例 3 中,我们告诉事件循环确切在哪里中断函数以 运行 另一个任务。具体在哪里?就在这里!

await asyncio.sleep(1)

阅读更多内容here

更新 02/May/2020

考虑阅读

Python 3.7+ 现在有 a simpler API(在我看来)用更简单的措辞(比“ensure_future”更容易记住):你可以使用 create_task return 是一个 Task 对象(如果需要,稍后可以用来取消任务)。

基本示例 1

import asyncio

async def hello(i):
    print(f"hello {i} started")
    await asyncio.sleep(4)
    print(f"hello {i} done")

async def main():
    task1 = asyncio.create_task(hello(1))  # returns immediately, the task is created
    await asyncio.sleep(3)
    task2 = asyncio.create_task(hello(2))
    await task1
    await task2

asyncio.run(main())  # main loop

结果:

hello 1 started
hello 2 started
hello 1 done
hello 2 done


基本示例 2

如果你需要获取这些异步函数的return值,那么gather就派上用场了。以下示例的灵感来自 documentation,但不幸的是,该文档并未显示 gather 的真正用途:获取 return 值!

import asyncio

async def factorial(n):
    f = 1
    for i in range(2, n + 1):
        print(f"Computing factorial({n}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    return f

async def main():
    L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
    print(L)  # [2, 6, 24]

asyncio.run(main())

预期输出:

Computing factorial(2), currently i=2...
Computing factorial(3), currently i=2...
Computing factorial(4), currently i=2...
Computing factorial(3), currently i=3...
Computing factorial(4), currently i=3...
Computing factorial(4), currently i=4...
[2, 6, 24]


PS:即使你使用 asyncio,而不是 triothe tutorial of the latter 也对我理解 Python 异步编程很有帮助。

既然一切都解释得很好,那么让我们运行一些事件循环的例子来比较同步代码和异步代码。

同步代码:

import time

def count():
    time.sleep(1)
    print('1')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('3')

def main():
    for i in range(3):
        count()

if __name__ == "__main__":
    t = time.perf_counter()
    main()
    t2 = time.perf_counter()
    
    print(f'Total time elapsed: {t2:0.2f} seconds')

输出:

1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds

我们可以看到每个循环计数 运行 在下一个循环开始之前完成。

异步代码:

import asyncio
import time

async def count():
    await asyncio.sleep(1)
    print('1')
    await asyncio.sleep(1)
    print('2')
    await asyncio.sleep(1)
    print('3')

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    t = time.perf_counter()
    asyncio.run(main())
    t2 = time.perf_counter()

    print(f'Total time elapsed: {t2:0.2f} seconds')

输出:

1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds

另一方面,异步等价物看起来像这样花了三秒到 运行 而不是九秒。 第一个计数周期开始,一旦它达到 await 睡眠,Python 就可以自由地做其他工作,例如开始第二个计数周期,然后是第三个计数周期。 这就是为什么我们拥有所有管子而不是所有管子然后所有三个管子。 在输出编程中并发可以是一个非常有价值的工具。 多处理让操作完成所有多任务工作,在 Python 中,它是 multi-core 并发的唯一选项,它让您的程序在 CPU 的多个内核上执行。 如果使用线程,那么操作系统仍在执行所有多任务处理工作,并且在 cpython 中,全局 intrepeter 锁会阻止 multi-core 异步编程中的并发。 没有操作系统干预 一个进程 一个线程 所以好在有等待时间的任务可以释放CPU,以便其他任务可以使用它。

import asyncio

loop = asyncio.get_event_loop()


async def greeter(name):
    print(f"Hi, {name} you're in a coroutine.")

try:
    print('starting coroutine')
    coro = greeter('LP')
    print('entering event loop')
    loop.run_until_complete(coro)
finally:
    print('closing event loop')
    loop.close()

输出:

starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop

异步框架需要一个通常称为事件循环的调度程序。这个事件循环跟踪所有 运行ning 任务,当一个函数暂停它时 returns 控制事件循环,然后事件循环会找到另一个函数来启动或恢复,这称为协作多任务处理。 Async IO 提供了一个框架,一个以事件循环为中心的异步框架,它有效地处理 input/output 事件 应用程序显式地与事件循环交互 它注册代码为 运行 然后它让事件循环当资源可用时,调度程序对应用程序代码进行必要的调用。 因此,如果网络服务器打开套接字,然后注册它们以便在它们发生输入事件时被告知事件循环将在有新的传入连接或有数据要读取时提醒服务器代码。 如果从套接字中读取的数据不多于服务器,则将控制权交还给事件循环。

将控制权交还给事件循环的机制取决于 co-routines co-routines 是一种为并发操作而设计的语言结构。 co-routine 可以使用 awake 关键字和另一个 co-routine 暂停执行,并且在暂停时保持 co-routine 状态,允许它从停止的地方恢复 co-routine 可以开始另一个然后等待结果,这样可以更轻松地将任务分解为可重用的部分。

import asyncio

loop = asyncio.get_event_loop()

async def outer():
    print('in outer')
    print('waiting for result 1')
    result1 = await phase1()
    print('waiting for result 2')
    result2 = await phase2(result1)
    return result1, result2


async def phase1():
    print('in phase1')
    return 'phase1 result'

async def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)

asyncio.run(outer())

输出:

in outer
waiting for result 1
in phase1
waiting for result 2
in phase2

此示例要求必须按顺序执行但可以 运行 与其他操作同时执行的两个阶段。使用 awake 关键字而不是将新的 co-routines 添加到循环中,因为控制流已经在由循环管理的 co-routine 内部。没有必要告诉循环管理新的 co-routines.

import asyncio
import requests

async def fetch_users():
    response = requests.get('https://www.testjsonapi.com/users/')
    users = response.json()
    return users

async def print_users():
    # create an asynchronous task to run concurrently 
    # which wont block executing print statement before it finishes
    response = asyncio.create_task(fetch_users())
    print("Fetching users ")
    # wait to get users data from response before printing users
    users = await response

    for user in users:
        print(f"name : {user['name']} email : {user['email']}")

asyncio.run(print_users())
print("All users printed in console")

输出将如下所示

Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com 
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console

让我们看看代码是如何工作的。首先,当 python 将调用 print_users() 时,它不会让下面的 print 语句被执行,直到它完成。因此,在进入 print_users() 之后,将创建一个并发任务,以便它下面的语句可以 运行 与此处的 fetch_users() 任务同时进行。当此任务将 运行 到那时 Fetching users 将打印在控制台中。之后 python 将等待 fetch_users() 的响应,因为在接收之前不应打印用户。 fetch_users() 完成后,所有用户名和电子邮件将打印在控制台中。这样,下面的print_users()打印语句完成后就会被执行。

我不知道为什么,但是关于这个主题的所有解释都太复杂了,或者他们使用的示例没有用 asyncio.sleep()... 到目前为止,我找到的最好的代码示例是:https://codeflex.co/python3-async-await-example/

似乎每个人都专注于将 time.sleep 切换为 asyncio.sleep,但在现实世界中,这总是不可能的。有时您需要进行库调用,这可能会进行 API 调用(例如:从 google 请求签名的 URL)。

以下是您仍然可以使用 time.sleep,但以异步方式使用的方法:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    _executor = ThreadPoolExecutor(2)
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await loop.run_in_executor(_executor, sleep)
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6

Time: 3.01 sec

简单..甜蜜..很棒..✅

  import asyncio
  import time
  import random

  async def eat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Eating")

  async def sleep():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Sleeping")

  async def repeat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Repeating")

  async def main():
     for x in range(5):
        await asyncio.gather(eat(),sleep(),repeat())
        time.sleep(2)
        print("+","-"*20)

  if __name__ == "__main__":
     t = time.perf_counter()
     asyncio.run(main())
     t2 = time.perf_counter()

     print(f'Total time elapsed: {t2:0.2f} seconds')

甚至认为上面的一些答案我认为有点抽象

from datetime import datetime
import asyncio




async def time_taking(max_val,task_no):
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))

    await asyncio.sleep(2)
    value_list = []
    for i in range(0,max_val):
        value_list.append(i)

    print("****FINSIHING UP TASk NO {}  **".format(task_no))
    return value_list



async def test2(task_no):
    await asyncio.sleep(5)
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
    await asyncio.sleep(5)
    print("****FINSIHING UP  TASk NO {}  **".format(task_no))

async def function(value = None):
    tasks = []
    start_time = datetime.now()
    
    # CONCURRENT TASKS
    tasks.append(asyncio.create_task(time_taking(20,1)))
    tasks.append(asyncio.create_task(time_taking(43,2)))
    tasks.append(asyncio.create_task(test2(3)))
    
    # concurrent execution
    lists = await asyncio.gather(*tasks)
    end_time = datetime.now()
    
    time_taken = end_time - start_time
    return lists,time_taken


# run inside event loop 
res,time_taken = asyncio.run(function())

print(res,time_taken)