在不冻结 GUI 的情况下一起使用 asyncio 和 Tkinter(或另一个 GUI 库)

Use asyncio and Tkinter (or another GUI lib) together without freezing the GUI

我想使用 asyncio in combination with a tkinter GUI。 我刚接触asyncio,对它的理解不是很详细。 单击第一个按钮时,此处的示例将启动 10 个任务。该任务只是模拟 sleep() 工作几秒钟。

示例代码 运行与 Python 3.6.4rc1 相得益彰。 但是 问题 是GUI 被冻结了。当我按下第一个按钮并启动 10 个异步任务时,在完成所有任务之前,我无法按下 GUI 中的第二个按钮。 GUI 永远不应该冻结 - 这是我的目标。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from tkinter import *
from tkinter import messagebox
import asyncio
import random

def do_freezed():
    """ Button-Event-Handler to see if a button on GUI works. """
    messagebox.showinfo(message='Tkinter is reacting.')

def do_tasks():
    """ Button-Event-Handler starting the asyncio part. """
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(do_urls())
    finally:
        loop.close()

async def one_url(url):
    """ One task. """
    sec = random.randint(1, 15)
    await asyncio.sleep(sec)
    return 'url: {}\tsec: {}'.format(url, sec)

async def do_urls():
    """ Creating and starting 10 tasks. """
    tasks = [
        one_url(url)
        for url in range(10)
    ]
    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]
    print('\n'.join(results))


if __name__ == '__main__':
    root = Tk()

    buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
    buttonT.pack()
    buttonX = Button(master=root, text='Freezed???', command=do_freezed)
    buttonX.pack()

    root.mainloop()

一个_side问题

...是因为这个错误,我无法再次 运行 任务。

Exception in Tkinter callback
Traceback (most recent call last):
  File "/usr/lib/python3.6/tkinter/__init__.py", line 1699, in __call__
    return self.func(*args)
  File "./tk_simple.py", line 17, in do_tasks
    loop.run_until_complete(do_urls())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 443, in run_until_complete
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

多线程

多线程是一种可能的解决方案吗?只有两个线程 - 每个循环都有自己的线程?

编辑:在查看了这个问题及其答案后,它几乎与所有 GUI 库(例如 PygObject/Gtk、wxWidgets、Qt 等)有关。

同时尝试 运行 两个事件循环是一个可疑的提议。但是,由于 root.mainloop 只是重复调用 root.update,因此可以通过将更新作为异步任务重复调用来模拟主循环。这是一个这样做的测试程序。我假设将 asyncio 任务添加到 tkinter 任务中会起作用。我用 3.7.0a2.

检查过它仍然 运行s
"""Proof of concept: integrate tkinter, asyncio and async iterator.

Terry Jan Reedy, 2016 July 25
"""

import asyncio
from random import randrange as rr
import tkinter as tk


class App(tk.Tk):
    
    def __init__(self, loop, interval=1/120):
        super().__init__()
        self.loop = loop
        self.protocol("WM_DELETE_WINDOW", self.close)
        self.tasks = []
        self.tasks.append(loop.create_task(self.rotator(1/60, 2)))
        self.tasks.append(loop.create_task(self.updater(interval)))

    async def rotator(self, interval, d_per_tick):
        canvas = tk.Canvas(self, height=600, width=600)
        canvas.pack()
        deg = 0
        color = 'black'
        arc = canvas.create_arc(100, 100, 500, 500, style=tk.CHORD,
                                start=0, extent=deg, fill=color)
        while await asyncio.sleep(interval, True):
            deg, color = deg_color(deg, d_per_tick, color)
            canvas.itemconfigure(arc, extent=deg, fill=color)

    async def updater(self, interval):
        while True:
            self.update()
            await asyncio.sleep(interval)

    def close(self):
        for task in self.tasks:
            task.cancel()
        self.loop.stop()
        self.destroy()


def deg_color(deg, d_per_tick, color):
    deg += d_per_tick
    if 360 <= deg:
        deg %= 360
        color = '#%02x%02x%02x' % (rr(0, 256), rr(0, 256), rr(0, 256))
    return deg, color

loop = asyncio.get_event_loop()
app = App(loop)
loop.run_forever()
loop.close()

随着时间间隔的减少,tk 更新开销和时间分辨率都会增加。对于 gui 更新,相对于动画,每秒 20 次可能就足够了。

我最近成功 运行ning 包含 tkinter 调用的异步 def 协同程序并等待 mainloop。原型使用 asyncio Tasks 和 Futures,但我不知道添加普通的 asyncio 任务是否可行。如果有人想 运行 asyncio 和 tkinter 一起执行任务,我认为 运行ning tk update with asyncio loop 是一个更好的主意。

编辑:至少如上所用,没有 async def 协程的异常会杀死协程,但在某处被捕获并丢弃。静默错误非常令人讨厌。

EDIT2:附加代码和注释 https://bugs.python.org/issue27546

您可以在按下 Button 后通过在正确的位置添加对 root.update_idletasks() 的调用来保持 GUI 活动:

from tkinter import *
from tkinter import messagebox
import asyncio
import random

def do_freezed():
    """ Button-Event-Handler to see if a button on GUI works. """
    messagebox.showinfo(message='Tkinter is reacting.')

def do_tasks():
    """ Button-Event-Handler starting the asyncio part. """
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(do_urls())
    finally:
        loop.close()

async def one_url(url):
    """ One task. """
    sec = random.randint(1, 15)
    root.update_idletasks()  # ADDED: Allow tkinter to update gui.
    await asyncio.sleep(sec)
    return 'url: {}\tsec: {}'.format(url, sec)

async def do_urls():
    """ Creating and starting 10 tasks. """
    tasks = [one_url(url) for url in range(10)]
    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]
    print('\n'.join(results))


if __name__ == '__main__':
    root = Tk()

    buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
    buttonT.pack()
    buttonX = Button(master=root, text='Freezed???', command=do_freezed)
    buttonX.pack()

    root.mainloop()

在对您的代码稍作修改后,我在主线程中创建了 asyncio event_loop 并将其作为参数传递给 asyncio 线程。现在 Tkinter 在获取 url 时不会冻结。

from tkinter import *
from tkinter import messagebox
import asyncio
import threading
import random

def _asyncio_thread(async_loop):
    async_loop.run_until_complete(do_urls())


def do_tasks(async_loop):
    """ Button-Event-Handler starting the asyncio part. """
    threading.Thread(target=_asyncio_thread, args=(async_loop,)).start()

    
async def one_url(url):
    """ One task. """
    sec = random.randint(1, 8)
    await asyncio.sleep(sec)
    return 'url: {}\tsec: {}'.format(url, sec)

async def do_urls():
    """ Creating and starting 10 tasks. """
    tasks = [one_url(url) for url in range(10)]
    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]
    print('\n'.join(results))


def do_freezed():
    messagebox.showinfo(message='Tkinter is reacting.')

def main(async_loop):
    root = Tk()
    Button(master=root, text='Asyncio Tasks', command= lambda:do_tasks(async_loop)).pack()
    Button(master=root, text='Freezed???', command=do_freezed).pack()
    root.mainloop()

if __name__ == '__main__':
    async_loop = asyncio.get_event_loop()
    main(async_loop)

我很幸运 运行 在另一个线程上有一个 I/O 循环,在应用程序创建之初开始,并使用 asyncio.run_coroutine_threadsafe(..) 将任务扔到它上面。

令我感到惊讶的是,我可以在另一个 asyncio 上更改 tkinter 小部件 loop/thread,也许它对我有用只是侥幸——但它确实有效。

请注意,当 asyncio 任务正在进行时,other 按钮仍然有效并有响应。我总是喜欢另一个按钮上的 disable/enable 东西,这样你就不会意外触发多个任务,但这只是一个 UI 东西。

import threading
from functools import partial
from tkinter import *
from tkinter import messagebox
import asyncio
import random


# Please wrap all this code in a nice App class, of course

def _run_aio_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
aioloop = asyncio.new_event_loop()
t = threading.Thread(target=partial(_run_aio_loop, aioloop))
t.daemon = True  # Optional depending on how you plan to shutdown the app
t.start()

buttonT = None

def do_freezed():
    """ Button-Event-Handler to see if a button on GUI works. """
    messagebox.showinfo(message='Tkinter is reacting.')

def do_tasks():
    """ Button-Event-Handler starting the asyncio part. """
    buttonT.configure(state=DISABLED)
    asyncio.run_coroutine_threadsafe(do_urls(), aioloop)

async def one_url(url):
    """ One task. """
    sec = random.randint(1, 3)
    # root.update_idletasks()  # We can delete this now
    await asyncio.sleep(sec)
    return 'url: {}\tsec: {}'.format(url, sec)

async def do_urls():
    """ Creating and starting 10 tasks. """
    tasks = [one_url(url) for url in range(3)]
    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]
    print('\n'.join(results))
    buttonT.configure(state=NORMAL)  # Tk doesn't seem to care that this is called on another thread


if __name__ == '__main__':
    root = Tk()

    buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
    buttonT.pack()
    buttonX = Button(master=root, text='Freezed???', command=do_freezed)
    buttonX.pack()

    root.mainloop()

我来晚了一点,但如果您没有目标 Windows,您可以使用 aiotkinter 来实现您想要的。我修改了您的代码以向您展示如何使用此包:

from tkinter import *
from tkinter import messagebox
import asyncio
import random

import aiotkinter

def do_freezed():
    """ Button-Event-Handler to see if a button on GUI works. """
    messagebox.showinfo(message='Tkinter is reacting.')

def do_tasks():
    task = asyncio.ensure_future(do_urls())
    task.add_done_callback(tasks_done)

def tasks_done(task):
    messagebox.showinfo(message='Tasks done.')

async def one_url(url):
    """ One task. """
    sec = random.randint(1, 15)
    await asyncio.sleep(sec)
    return 'url: {}\tsec: {}'.format(url, sec)

async def do_urls():
    """ Creating and starting 10 tasks. """
    tasks = [
        one_url(url)
        for url in range(10)
    ]
    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]
    print('\n'.join(results))

if __name__ == '__main__':
    asyncio.set_event_loop_policy(aiotkinter.TkinterEventLoopPolicy())
    loop = asyncio.get_event_loop()
    root = Tk()
    buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
    buttonT.pack()
    buttonX = Button(master=root, text='Freezed???', command=do_freezed)
    buttonX.pack()
    loop.run_forever()

我用 multiprocessing 解决了类似的任务。

主要部分:

  1. 主进程是 Tkmainloop 的进程。
  2. daemon=True process with aiohttp service that executes commands.
  3. 内部通信使用双工Pipe所以每个进程都可以使用它的结束。

此外,我正在制作 Tk 的虚拟事件以简化应用端的按摩跟踪。您需要apply patch manually. You can check python's bug tracker了解详情。

我在两侧每 0.25 秒检查一次 Pipe

$ python --version
Python 3.7.3

main.py

import asyncio
import multiprocessing as mp

from ws import main
from app import App


class WebSocketProcess(mp.Process):

    def __init__(self, pipe, *args, **kw):
        super().__init__(*args, **kw)
        self.pipe = pipe

    def run(self):
        loop = asyncio.get_event_loop()
        loop.create_task(main(self.pipe))
        loop.run_forever()


if __name__ == '__main__':
    pipe = mp.Pipe()
    WebSocketProcess(pipe, daemon=True).start()
    App(pipe).mainloop()

app.py

import tkinter as tk


class App(tk.Tk):

    def __init__(self, pipe, *args, **kw):
        super().__init__(*args, **kw)
        self.app_pipe, _ = pipe
        self.ws_check_interval = 250;
        self.after(self.ws_check_interval, self.ws_check)

    def join_channel(self, channel_str):
        self.app_pipe.send({
            'command': 'join',
            'data': {
                'channel': channel_str
            }
        })

    def ws_check(self):
        while self.app_pipe.poll():
            msg = self.app_pipe.recv()
            self.event_generate('<<ws-event>>', data=json.dumps(msg), when='tail')
        self.after(self.ws_check_interval, self.ws_check)

ws.py

import asyncio

import aiohttp


async def read_pipe(session, ws, ws_pipe):
    while True:
        while ws_pipe.poll():
            msg = ws_pipe.recv()

            # web socket send
            if msg['command'] == 'join':
                await ws.send_json(msg['data'])

            # html request
            elif msg['command'] == 'ticker':
                async with session.get('https://example.com/api/ticker/') as response:
                    ws_pipe.send({'event': 'ticker', 'data': await response.json()})

        await asyncio.sleep(.25)


async def main(pipe, loop):
    _, ws_pipe = pipe
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('wss://example.com/') as ws:
            task = loop.create_task(read_pipe(session, ws, ws_pipe))
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    if msg.data == 'close cmd':
                        await ws.close()
                        break
                    ws_pipe.send(msg.json())
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

使用 Python3.9,可以通过制作几个异步函数来完成,其中一个函数负责 Tk update()。在主循环中,ensure_future() 可用于在启动异步循环之前调用所有这些异步函数。

#!/usr/bin/env python3.9

import aioredis
import asyncio
import tkinter as tk 
import tkinter.scrolledtext as st 
import json

async def redis_main(logs):
    redisS = await aioredis.create_connection(('localhost', 6379))  
    subCh = aioredis.Channel('pylog', is_pattern=False)
    await redisS.execute_pubsub('subscribe', subCh)
    while await subCh.wait_message():
            msg = await subCh.get()
            jmsg = json.loads(msg.decode('utf-8'))
            logs.insert(tk.INSERT, jmsg['msg'] + '\n')

async def tk_main(root):
    while True:
        root.update()
        await asyncio.sleep(0.05)

def on_closing():
    asyncio.get_running_loop().stop()

if __name__ == '__main__':
    root = tk.Tk()
    root.protocol("WM_DELETE_WINDOW", on_closing)
    logs = st.ScrolledText(root, width=30, height=8)
    logs.grid()
    
    tkmain = asyncio.ensure_future(tk_main(root))
    rdmain = asyncio.ensure_future(redis_main(logs))
    
    loop = asyncio.get_event_loop()
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    tkmain.cancel()
    rdmain.cancel()