Python asyncio 任务列表生成而不执行函数

Python asyncio task list generation without executing the function

在使用 asyncio 时,我正在尝试使用列表理解来构建我的任务列表。函数的基本形式如下:

import asyncio
import urllib.request as req
@asyncio.coroutine
def coro(term):
    print(term)
    google = "https://www.google.com/search?q=" + term.replace(" ", "+") + "&num=100&start=0"
    request = req.Request(google, None, headers) 
    (some beautiful soup stuff)

我的目标是使用术语列表来创建我的任务列表:

terms = ["pie", "chicken" ,"things" ,"stuff"]    
tasks=[
    coro("pie"),
    coro("chicken"),
    coro("things"),
    coro("stuff")]

我最初的想法是:

loop = asyncio.get_event_loop()
tasks = [my_coroutine(term) for term in terms]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

这不会创建任务列表,它会在列表理解期间运行函数。有没有一种方法可以使用快捷方式创建任务列表而无需编写每个任务?

这适用于 python 3.5(添加了新的 async-await 语法):

import asyncio

async def coro(term):
    for i in range(3):
        await asyncio.sleep(int(len(term)))  # just sleep
        print("cor1", i, term)

terms = ["pie", "chicken", "things", "stuff"]
tasks = [coro(term) for term in terms]

loop = asyncio.get_event_loop()
cors = asyncio.wait(tasks)
loop.run_until_complete(cors)

你的版本不应该 yield from req.Request(google, None, headers)?并且(那是什么图书馆?)这个图书馆甚至可以与 asyncio 一起使用吗?

(这里是 python <= 3.4 语法的相同代码;缺少的部分与上面相同):

@asyncio.coroutine
def coro(term):
    for i in range(3):
        yield from asyncio.sleep(int(len(term)))  # just sleep
        print("cor1", i, term)

您的 HTTP 客户端不支持 asyncio,您将无法获得预期的结果。试试看 .wait() 是否按预期工作:

import asyncio
import random

@asyncio.coroutine
def my_coroutine(term):
    print("start", term)
    yield from asyncio.sleep(random.uniform(1, 3))
    print("end", term)


terms = ["pie", "chicken", "things", "stuff"]
loop = asyncio.get_event_loop()
tasks = [my_coroutine(term) for term in terms]
print("Here we go!")
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

如果你使用 asyncio.gather() 你会得到一个封装你所有任务的未来,可以用 .cancel() 轻松取消,这里用 python 3.5+ async def/ 演示await 语法(但与 @coroutineyield from 相同):

import asyncio

import random


async def my_coroutine(term):
    print("start", term)
    n = random.uniform(0.2, 1.5)
    await asyncio.sleep(n)
    print("end", term)
    return "Term {} slept for {:.2f} seconds".format(term, n)


async def stop_all():
    """Cancels all still running tasks after one second"""
    await asyncio.sleep(1)
    print("stopping")
    fut.cancel()
    return ":-)"


loop = asyncio.get_event_loop()
terms = ["pie", "chicken", "things", "stuff"]
tasks = (my_coroutine(term) for term in terms)
fut = asyncio.gather(stop_all(), *tasks, return_exceptions=True)

print("Here we go!")
loop.run_until_complete(fut)

for task_result in fut.result():
    if not isinstance(task_result, Exception):
        print("OK", task_result)
    else:
        print("Failed", task_result)

loop.close()

最后,如果您想使用异步 HTTP 客户端,请尝试 aiohttp。首先安装它:

pip install aiohttp

然后试试这个例子,它使用 asyncio.as_completed:

import asyncio

import aiohttp


async def fetch(session, url):
    print("Getting {}...".format(url))
    async with session.get(url) as resp:
        text = await resp.text()
    return "{}: Got {} bytes".format(url, len(text))


async def fetch_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
                 for delay in (1, 1, 2, 3, 3)]
        for task in asyncio.as_completed(tasks):
            print(await task)
    return "Done."


loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()

创建队列和运行事件循环

def main():
    while terms:
        tasks.append(asyncio.create_task(terms.pop())

responses = asyncio.gather(*tasks, return_exception=True)
loop = asyncio.get_event_loop()
loop.run_until_complete(responses)