Python asyncio (aiohttp, aiofiles)

Python asyncio (aiohttp, aiofiles)

我似乎很难理解 pythons asyncio。我没有编写任何代码,因为我看到的所有示例都是一次性的 运行s。创建几个协程,将它们添加到事件循环中,然后 运行 循环,它们 运行 在它们之间切换任务,完成。这对我来说似乎没什么帮助。

我想使用 asyncio 来不中断我应用程序中的操作(使用 pyqt5)。我想创建一些函数,当在 asyncio 事件循环中调用 运行 时,它们完成后会进行回调。

我想象的是。为 asyncio 创建一个单独的线程,创建循环并 运行 永远。创建一些函数 getFile(url, fp)get(url)readFile(file) 等。然后在 UI 中,我有一个带有提交按钮的文本框,用户输入 url,点击提交,下载文件。

但是,我看到的每个示例都看不到如何将协程添加到 运行ning 循环。而且我不知道如何在不添加 运行ning 循环的情况下做我想做的事。

#!/bin/python3
import asyncio
import aiohttp
import threading

loop = asyncio.get_event_loop()

def async_in_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def _get(url, callback):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            result = await response.text()
            callback(result)
            return

def get(url, callback):
    asyncio.ensure_future(_get(url, callback))

thread = threading.Thread(target=async_in_thread, args=(loop, ))

thread.start()

def stop():
    loop.close()

def callme(data):
    print(data)
    stop()

get("http://google.com", callme)

thread.join()

这是我想象的,但它不起作用。

要将协程添加到不同线程中的循环 运行ning,请使用 asyncio.run_coroutine_threadsafe:

def get(url, callback):
    asyncio.run_coroutine_threadsafe(_get(url, callback))

一般来说,当您从 运行 所在的线程外部与事件循环进行交互时,您必须 运行 通过 run_coroutine_threadsafe (协程)或 loop.call_soon_threadsafe(对于函数)。例如,要停止循环,请使用 loop.call_soon_threadsafe(loop.stop)。另请注意,loop.close() 不得在循环回调内调用,因此您应该将该调用放在 async_in_thread 中,紧接在 run_forever() 调用之后,此时循环肯定已停止 运行宁.

asyncio 的另一件事是传递显式 when_done 回调不是惯用的,因为 asyncio 公开了期货的概念(类似于 JavaScript 承诺),它允许将回调附加到尚未- 可用的结果。例如,可以这样写 _get

async def _get(url):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

它不需要 callback 参数,因为任何相关方都可以使用 loop.create_task 将其转换为任务,并使用 add_done_callback 在任务完成时收到通知。例如:

def _get_with_callback(url, callback):
    loop = asyncio.get_event_loop()
    task = loop.create_task(_get(url))
    task.add_done_callback(lambda _fut: callback(task.result()))

在您的情况下,您没有直接处理任务,因为您的代码旨在与来自另一个线程的事件循环进行通信。但是,run_coroutine_threadsafe returns 是一个非常有用的值 - 一个完整的 concurrent.futures.Future,您可以使用它来注册完成的回调。您可以将 future 对象公开给调用者,而不是接受 callback 参数:

def get(url):
    return asyncio.run_coroutine_threadsafe(_get(url), loop)

现在调用者可以选择基于回调的方法:

future = get(url)
# call me when done
future.add_done_callback(some_callback)
# ... proceed with other work ...

或者,在适当的时候,他们甚至可以等待结果:

# give me the response, I'll wait for it
result = get(url).result()

后者根据定义是阻塞的,但由于事件循环安全地运行在不同的线程中,它不受阻塞调用的影响。

安装 QualMash 以顺利集成 Qt 和 asyncio。

项目自述文件中的示例提供了灵感:

import sys
import asyncio
import time

from PyQt5.QtWidgets import QApplication, QProgressBar
from quamash import QEventLoop, QThreadExecutor

app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)  # NEW must set the event loop

progress = QProgressBar()
progress.setRange(0, 99)
progress.show()

async def master():
    await first_50()
    with QThreadExecutor(1) as exec:
        await loop.run_in_executor(exec, last_50)

async def first_50():
    for i in range(50):
        progress.setValue(i)
        await asyncio.sleep(.1)

def last_50():
    for i in range(50,100):
        loop.call_soon_threadsafe(progress.setValue, i)
        time.sleep(.1)

with loop: ## context manager calls .close() when loop completes, and releases all resources
    loop.run_until_complete(master())