在不冻结 GUI 的情况下一起使用 asyncio 和 Tkinter(或另一个 GUI 库)
Use asyncio and Tkinter (or another GUI lib) together without freezing the GUI
我想使用 asyncio
in combination with a tkinter
GUI。
我刚接触asyncio
,对它的理解不是很详细。
单击第一个按钮时,此处的示例将启动 10 个任务。该任务只是模拟 sleep()
工作几秒钟。
示例代码 运行与 Python 3.6.4rc1
相得益彰。 但是
问题 是GUI 被冻结了。当我按下第一个按钮并启动 10 个异步任务时,在完成所有任务之前,我无法按下 GUI 中的第二个按钮。 GUI 永远不应该冻结 - 这是我的目标。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from tkinter import *
from tkinter import messagebox
import asyncio
import random
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
""" Button-Event-Handler starting the asyncio part. """
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(do_urls())
finally:
loop.close()
async def one_url(url):
""" One task. """
sec = random.randint(1, 15)
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [
one_url(url)
for url in range(10)
]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
if __name__ == '__main__':
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
root.mainloop()
一个_side问题
...是因为这个错误,我无法再次 运行 任务。
Exception in Tkinter callback
Traceback (most recent call last):
File "/usr/lib/python3.6/tkinter/__init__.py", line 1699, in __call__
return self.func(*args)
File "./tk_simple.py", line 17, in do_tasks
loop.run_until_complete(do_urls())
File "/usr/lib/python3.6/asyncio/base_events.py", line 443, in run_until_complete
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
多线程
多线程是一种可能的解决方案吗?只有两个线程 - 每个循环都有自己的线程?
编辑:在查看了这个问题及其答案后,它几乎与所有 GUI 库(例如 PygObject/Gtk、wxWidgets、Qt 等)有关。
同时尝试 运行 两个事件循环是一个可疑的提议。但是,由于 root.mainloop 只是重复调用 root.update,因此可以通过将更新作为异步任务重复调用来模拟主循环。这是一个这样做的测试程序。我假设将 asyncio 任务添加到 tkinter 任务中会起作用。我用 3.7.0a2.
检查过它仍然 运行s
"""Proof of concept: integrate tkinter, asyncio and async iterator.
Terry Jan Reedy, 2016 July 25
"""
import asyncio
from random import randrange as rr
import tkinter as tk
class App(tk.Tk):
def __init__(self, loop, interval=1/120):
super().__init__()
self.loop = loop
self.protocol("WM_DELETE_WINDOW", self.close)
self.tasks = []
self.tasks.append(loop.create_task(self.rotator(1/60, 2)))
self.tasks.append(loop.create_task(self.updater(interval)))
async def rotator(self, interval, d_per_tick):
canvas = tk.Canvas(self, height=600, width=600)
canvas.pack()
deg = 0
color = 'black'
arc = canvas.create_arc(100, 100, 500, 500, style=tk.CHORD,
start=0, extent=deg, fill=color)
while await asyncio.sleep(interval, True):
deg, color = deg_color(deg, d_per_tick, color)
canvas.itemconfigure(arc, extent=deg, fill=color)
async def updater(self, interval):
while True:
self.update()
await asyncio.sleep(interval)
def close(self):
for task in self.tasks:
task.cancel()
self.loop.stop()
self.destroy()
def deg_color(deg, d_per_tick, color):
deg += d_per_tick
if 360 <= deg:
deg %= 360
color = '#%02x%02x%02x' % (rr(0, 256), rr(0, 256), rr(0, 256))
return deg, color
loop = asyncio.get_event_loop()
app = App(loop)
loop.run_forever()
loop.close()
随着时间间隔的减少,tk 更新开销和时间分辨率都会增加。对于 gui 更新,相对于动画,每秒 20 次可能就足够了。
我最近成功 运行ning 包含 tkinter 调用的异步 def 协同程序并等待 mainloop。原型使用 asyncio Tasks 和 Futures,但我不知道添加普通的 asyncio 任务是否可行。如果有人想 运行 asyncio 和 tkinter 一起执行任务,我认为 运行ning tk update with asyncio loop 是一个更好的主意。
编辑:至少如上所用,没有 async def 协程的异常会杀死协程,但在某处被捕获并丢弃。静默错误非常令人讨厌。
EDIT2:附加代码和注释
https://bugs.python.org/issue27546
您可以在按下 Button
后通过在正确的位置添加对 root.update_idletasks()
的调用来保持 GUI 活动:
from tkinter import *
from tkinter import messagebox
import asyncio
import random
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
""" Button-Event-Handler starting the asyncio part. """
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(do_urls())
finally:
loop.close()
async def one_url(url):
""" One task. """
sec = random.randint(1, 15)
root.update_idletasks() # ADDED: Allow tkinter to update gui.
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [one_url(url) for url in range(10)]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
if __name__ == '__main__':
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
root.mainloop()
在对您的代码稍作修改后,我在主线程中创建了 asyncio event_loop
并将其作为参数传递给 asyncio 线程。现在 Tkinter 在获取 url 时不会冻结。
from tkinter import *
from tkinter import messagebox
import asyncio
import threading
import random
def _asyncio_thread(async_loop):
async_loop.run_until_complete(do_urls())
def do_tasks(async_loop):
""" Button-Event-Handler starting the asyncio part. """
threading.Thread(target=_asyncio_thread, args=(async_loop,)).start()
async def one_url(url):
""" One task. """
sec = random.randint(1, 8)
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [one_url(url) for url in range(10)]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
def do_freezed():
messagebox.showinfo(message='Tkinter is reacting.')
def main(async_loop):
root = Tk()
Button(master=root, text='Asyncio Tasks', command= lambda:do_tasks(async_loop)).pack()
Button(master=root, text='Freezed???', command=do_freezed).pack()
root.mainloop()
if __name__ == '__main__':
async_loop = asyncio.get_event_loop()
main(async_loop)
我很幸运 运行 在另一个线程上有一个 I/O 循环,在应用程序创建之初开始,并使用 asyncio.run_coroutine_threadsafe(..)
将任务扔到它上面。
令我感到惊讶的是,我可以在另一个 asyncio 上更改 tkinter 小部件 loop/thread,也许它对我有用只是侥幸——但它确实有效。
请注意,当 asyncio 任务正在进行时,other 按钮仍然有效并有响应。我总是喜欢另一个按钮上的 disable/enable 东西,这样你就不会意外触发多个任务,但这只是一个 UI 东西。
import threading
from functools import partial
from tkinter import *
from tkinter import messagebox
import asyncio
import random
# Please wrap all this code in a nice App class, of course
def _run_aio_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
aioloop = asyncio.new_event_loop()
t = threading.Thread(target=partial(_run_aio_loop, aioloop))
t.daemon = True # Optional depending on how you plan to shutdown the app
t.start()
buttonT = None
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
""" Button-Event-Handler starting the asyncio part. """
buttonT.configure(state=DISABLED)
asyncio.run_coroutine_threadsafe(do_urls(), aioloop)
async def one_url(url):
""" One task. """
sec = random.randint(1, 3)
# root.update_idletasks() # We can delete this now
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [one_url(url) for url in range(3)]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
buttonT.configure(state=NORMAL) # Tk doesn't seem to care that this is called on another thread
if __name__ == '__main__':
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
root.mainloop()
我来晚了一点,但如果您没有目标 Windows,您可以使用 aiotkinter 来实现您想要的。我修改了您的代码以向您展示如何使用此包:
from tkinter import *
from tkinter import messagebox
import asyncio
import random
import aiotkinter
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
task = asyncio.ensure_future(do_urls())
task.add_done_callback(tasks_done)
def tasks_done(task):
messagebox.showinfo(message='Tasks done.')
async def one_url(url):
""" One task. """
sec = random.randint(1, 15)
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [
one_url(url)
for url in range(10)
]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
if __name__ == '__main__':
asyncio.set_event_loop_policy(aiotkinter.TkinterEventLoopPolicy())
loop = asyncio.get_event_loop()
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
loop.run_forever()
我用 multiprocessing
解决了类似的任务。
主要部分:
- 主进程是
Tk
和 mainloop
的进程。
daemon=True
process with aiohttp
service that executes commands.
- 内部通信使用双工
Pipe
所以每个进程都可以使用它的结束。
此外,我正在制作 Tk 的虚拟事件以简化应用端的按摩跟踪。您需要apply patch manually. You can check python's bug tracker了解详情。
我在两侧每 0.25 秒检查一次 Pipe
。
$ python --version
Python 3.7.3
main.py
import asyncio
import multiprocessing as mp
from ws import main
from app import App
class WebSocketProcess(mp.Process):
def __init__(self, pipe, *args, **kw):
super().__init__(*args, **kw)
self.pipe = pipe
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(main(self.pipe))
loop.run_forever()
if __name__ == '__main__':
pipe = mp.Pipe()
WebSocketProcess(pipe, daemon=True).start()
App(pipe).mainloop()
app.py
import tkinter as tk
class App(tk.Tk):
def __init__(self, pipe, *args, **kw):
super().__init__(*args, **kw)
self.app_pipe, _ = pipe
self.ws_check_interval = 250;
self.after(self.ws_check_interval, self.ws_check)
def join_channel(self, channel_str):
self.app_pipe.send({
'command': 'join',
'data': {
'channel': channel_str
}
})
def ws_check(self):
while self.app_pipe.poll():
msg = self.app_pipe.recv()
self.event_generate('<<ws-event>>', data=json.dumps(msg), when='tail')
self.after(self.ws_check_interval, self.ws_check)
ws.py
import asyncio
import aiohttp
async def read_pipe(session, ws, ws_pipe):
while True:
while ws_pipe.poll():
msg = ws_pipe.recv()
# web socket send
if msg['command'] == 'join':
await ws.send_json(msg['data'])
# html request
elif msg['command'] == 'ticker':
async with session.get('https://example.com/api/ticker/') as response:
ws_pipe.send({'event': 'ticker', 'data': await response.json()})
await asyncio.sleep(.25)
async def main(pipe, loop):
_, ws_pipe = pipe
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://example.com/') as ws:
task = loop.create_task(read_pipe(session, ws, ws_pipe))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close cmd':
await ws.close()
break
ws_pipe.send(msg.json())
elif msg.type == aiohttp.WSMsgType.ERROR:
break
使用 Python3.9,可以通过制作几个异步函数来完成,其中一个函数负责 Tk update()。在主循环中,ensure_future() 可用于在启动异步循环之前调用所有这些异步函数。
#!/usr/bin/env python3.9
import aioredis
import asyncio
import tkinter as tk
import tkinter.scrolledtext as st
import json
async def redis_main(logs):
redisS = await aioredis.create_connection(('localhost', 6379))
subCh = aioredis.Channel('pylog', is_pattern=False)
await redisS.execute_pubsub('subscribe', subCh)
while await subCh.wait_message():
msg = await subCh.get()
jmsg = json.loads(msg.decode('utf-8'))
logs.insert(tk.INSERT, jmsg['msg'] + '\n')
async def tk_main(root):
while True:
root.update()
await asyncio.sleep(0.05)
def on_closing():
asyncio.get_running_loop().stop()
if __name__ == '__main__':
root = tk.Tk()
root.protocol("WM_DELETE_WINDOW", on_closing)
logs = st.ScrolledText(root, width=30, height=8)
logs.grid()
tkmain = asyncio.ensure_future(tk_main(root))
rdmain = asyncio.ensure_future(redis_main(logs))
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
tkmain.cancel()
rdmain.cancel()
我想使用 asyncio
in combination with a tkinter
GUI。
我刚接触asyncio
,对它的理解不是很详细。
单击第一个按钮时,此处的示例将启动 10 个任务。该任务只是模拟 sleep()
工作几秒钟。
示例代码 运行与 Python 3.6.4rc1
相得益彰。 但是
问题 是GUI 被冻结了。当我按下第一个按钮并启动 10 个异步任务时,在完成所有任务之前,我无法按下 GUI 中的第二个按钮。 GUI 永远不应该冻结 - 这是我的目标。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from tkinter import *
from tkinter import messagebox
import asyncio
import random
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
""" Button-Event-Handler starting the asyncio part. """
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(do_urls())
finally:
loop.close()
async def one_url(url):
""" One task. """
sec = random.randint(1, 15)
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [
one_url(url)
for url in range(10)
]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
if __name__ == '__main__':
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
root.mainloop()
一个_side问题
...是因为这个错误,我无法再次 运行 任务。
Exception in Tkinter callback
Traceback (most recent call last):
File "/usr/lib/python3.6/tkinter/__init__.py", line 1699, in __call__
return self.func(*args)
File "./tk_simple.py", line 17, in do_tasks
loop.run_until_complete(do_urls())
File "/usr/lib/python3.6/asyncio/base_events.py", line 443, in run_until_complete
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
多线程
多线程是一种可能的解决方案吗?只有两个线程 - 每个循环都有自己的线程?
编辑:在查看了这个问题及其答案后,它几乎与所有 GUI 库(例如 PygObject/Gtk、wxWidgets、Qt 等)有关。
同时尝试 运行 两个事件循环是一个可疑的提议。但是,由于 root.mainloop 只是重复调用 root.update,因此可以通过将更新作为异步任务重复调用来模拟主循环。这是一个这样做的测试程序。我假设将 asyncio 任务添加到 tkinter 任务中会起作用。我用 3.7.0a2.
检查过它仍然 运行s"""Proof of concept: integrate tkinter, asyncio and async iterator.
Terry Jan Reedy, 2016 July 25
"""
import asyncio
from random import randrange as rr
import tkinter as tk
class App(tk.Tk):
def __init__(self, loop, interval=1/120):
super().__init__()
self.loop = loop
self.protocol("WM_DELETE_WINDOW", self.close)
self.tasks = []
self.tasks.append(loop.create_task(self.rotator(1/60, 2)))
self.tasks.append(loop.create_task(self.updater(interval)))
async def rotator(self, interval, d_per_tick):
canvas = tk.Canvas(self, height=600, width=600)
canvas.pack()
deg = 0
color = 'black'
arc = canvas.create_arc(100, 100, 500, 500, style=tk.CHORD,
start=0, extent=deg, fill=color)
while await asyncio.sleep(interval, True):
deg, color = deg_color(deg, d_per_tick, color)
canvas.itemconfigure(arc, extent=deg, fill=color)
async def updater(self, interval):
while True:
self.update()
await asyncio.sleep(interval)
def close(self):
for task in self.tasks:
task.cancel()
self.loop.stop()
self.destroy()
def deg_color(deg, d_per_tick, color):
deg += d_per_tick
if 360 <= deg:
deg %= 360
color = '#%02x%02x%02x' % (rr(0, 256), rr(0, 256), rr(0, 256))
return deg, color
loop = asyncio.get_event_loop()
app = App(loop)
loop.run_forever()
loop.close()
随着时间间隔的减少,tk 更新开销和时间分辨率都会增加。对于 gui 更新,相对于动画,每秒 20 次可能就足够了。
我最近成功 运行ning 包含 tkinter 调用的异步 def 协同程序并等待 mainloop。原型使用 asyncio Tasks 和 Futures,但我不知道添加普通的 asyncio 任务是否可行。如果有人想 运行 asyncio 和 tkinter 一起执行任务,我认为 运行ning tk update with asyncio loop 是一个更好的主意。
编辑:至少如上所用,没有 async def 协程的异常会杀死协程,但在某处被捕获并丢弃。静默错误非常令人讨厌。
EDIT2:附加代码和注释 https://bugs.python.org/issue27546
您可以在按下 Button
后通过在正确的位置添加对 root.update_idletasks()
的调用来保持 GUI 活动:
from tkinter import *
from tkinter import messagebox
import asyncio
import random
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
""" Button-Event-Handler starting the asyncio part. """
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(do_urls())
finally:
loop.close()
async def one_url(url):
""" One task. """
sec = random.randint(1, 15)
root.update_idletasks() # ADDED: Allow tkinter to update gui.
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [one_url(url) for url in range(10)]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
if __name__ == '__main__':
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
root.mainloop()
在对您的代码稍作修改后,我在主线程中创建了 asyncio event_loop
并将其作为参数传递给 asyncio 线程。现在 Tkinter 在获取 url 时不会冻结。
from tkinter import *
from tkinter import messagebox
import asyncio
import threading
import random
def _asyncio_thread(async_loop):
async_loop.run_until_complete(do_urls())
def do_tasks(async_loop):
""" Button-Event-Handler starting the asyncio part. """
threading.Thread(target=_asyncio_thread, args=(async_loop,)).start()
async def one_url(url):
""" One task. """
sec = random.randint(1, 8)
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [one_url(url) for url in range(10)]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
def do_freezed():
messagebox.showinfo(message='Tkinter is reacting.')
def main(async_loop):
root = Tk()
Button(master=root, text='Asyncio Tasks', command= lambda:do_tasks(async_loop)).pack()
Button(master=root, text='Freezed???', command=do_freezed).pack()
root.mainloop()
if __name__ == '__main__':
async_loop = asyncio.get_event_loop()
main(async_loop)
我很幸运 运行 在另一个线程上有一个 I/O 循环,在应用程序创建之初开始,并使用 asyncio.run_coroutine_threadsafe(..)
将任务扔到它上面。
令我感到惊讶的是,我可以在另一个 asyncio 上更改 tkinter 小部件 loop/thread,也许它对我有用只是侥幸——但它确实有效。
请注意,当 asyncio 任务正在进行时,other 按钮仍然有效并有响应。我总是喜欢另一个按钮上的 disable/enable 东西,这样你就不会意外触发多个任务,但这只是一个 UI 东西。
import threading
from functools import partial
from tkinter import *
from tkinter import messagebox
import asyncio
import random
# Please wrap all this code in a nice App class, of course
def _run_aio_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
aioloop = asyncio.new_event_loop()
t = threading.Thread(target=partial(_run_aio_loop, aioloop))
t.daemon = True # Optional depending on how you plan to shutdown the app
t.start()
buttonT = None
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
""" Button-Event-Handler starting the asyncio part. """
buttonT.configure(state=DISABLED)
asyncio.run_coroutine_threadsafe(do_urls(), aioloop)
async def one_url(url):
""" One task. """
sec = random.randint(1, 3)
# root.update_idletasks() # We can delete this now
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [one_url(url) for url in range(3)]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
buttonT.configure(state=NORMAL) # Tk doesn't seem to care that this is called on another thread
if __name__ == '__main__':
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
root.mainloop()
我来晚了一点,但如果您没有目标 Windows,您可以使用 aiotkinter 来实现您想要的。我修改了您的代码以向您展示如何使用此包:
from tkinter import *
from tkinter import messagebox
import asyncio
import random
import aiotkinter
def do_freezed():
""" Button-Event-Handler to see if a button on GUI works. """
messagebox.showinfo(message='Tkinter is reacting.')
def do_tasks():
task = asyncio.ensure_future(do_urls())
task.add_done_callback(tasks_done)
def tasks_done(task):
messagebox.showinfo(message='Tasks done.')
async def one_url(url):
""" One task. """
sec = random.randint(1, 15)
await asyncio.sleep(sec)
return 'url: {}\tsec: {}'.format(url, sec)
async def do_urls():
""" Creating and starting 10 tasks. """
tasks = [
one_url(url)
for url in range(10)
]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('\n'.join(results))
if __name__ == '__main__':
asyncio.set_event_loop_policy(aiotkinter.TkinterEventLoopPolicy())
loop = asyncio.get_event_loop()
root = Tk()
buttonT = Button(master=root, text='Asyncio Tasks', command=do_tasks)
buttonT.pack()
buttonX = Button(master=root, text='Freezed???', command=do_freezed)
buttonX.pack()
loop.run_forever()
我用 multiprocessing
解决了类似的任务。
主要部分:
- 主进程是
Tk
和mainloop
的进程。 daemon=True
process withaiohttp
service that executes commands.- 内部通信使用双工
Pipe
所以每个进程都可以使用它的结束。
此外,我正在制作 Tk 的虚拟事件以简化应用端的按摩跟踪。您需要apply patch manually. You can check python's bug tracker了解详情。
我在两侧每 0.25 秒检查一次 Pipe
。
$ python --version
Python 3.7.3
main.py
import asyncio
import multiprocessing as mp
from ws import main
from app import App
class WebSocketProcess(mp.Process):
def __init__(self, pipe, *args, **kw):
super().__init__(*args, **kw)
self.pipe = pipe
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(main(self.pipe))
loop.run_forever()
if __name__ == '__main__':
pipe = mp.Pipe()
WebSocketProcess(pipe, daemon=True).start()
App(pipe).mainloop()
app.py
import tkinter as tk
class App(tk.Tk):
def __init__(self, pipe, *args, **kw):
super().__init__(*args, **kw)
self.app_pipe, _ = pipe
self.ws_check_interval = 250;
self.after(self.ws_check_interval, self.ws_check)
def join_channel(self, channel_str):
self.app_pipe.send({
'command': 'join',
'data': {
'channel': channel_str
}
})
def ws_check(self):
while self.app_pipe.poll():
msg = self.app_pipe.recv()
self.event_generate('<<ws-event>>', data=json.dumps(msg), when='tail')
self.after(self.ws_check_interval, self.ws_check)
ws.py
import asyncio
import aiohttp
async def read_pipe(session, ws, ws_pipe):
while True:
while ws_pipe.poll():
msg = ws_pipe.recv()
# web socket send
if msg['command'] == 'join':
await ws.send_json(msg['data'])
# html request
elif msg['command'] == 'ticker':
async with session.get('https://example.com/api/ticker/') as response:
ws_pipe.send({'event': 'ticker', 'data': await response.json()})
await asyncio.sleep(.25)
async def main(pipe, loop):
_, ws_pipe = pipe
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://example.com/') as ws:
task = loop.create_task(read_pipe(session, ws, ws_pipe))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close cmd':
await ws.close()
break
ws_pipe.send(msg.json())
elif msg.type == aiohttp.WSMsgType.ERROR:
break
使用 Python3.9,可以通过制作几个异步函数来完成,其中一个函数负责 Tk update()。在主循环中,ensure_future() 可用于在启动异步循环之前调用所有这些异步函数。
#!/usr/bin/env python3.9
import aioredis
import asyncio
import tkinter as tk
import tkinter.scrolledtext as st
import json
async def redis_main(logs):
redisS = await aioredis.create_connection(('localhost', 6379))
subCh = aioredis.Channel('pylog', is_pattern=False)
await redisS.execute_pubsub('subscribe', subCh)
while await subCh.wait_message():
msg = await subCh.get()
jmsg = json.loads(msg.decode('utf-8'))
logs.insert(tk.INSERT, jmsg['msg'] + '\n')
async def tk_main(root):
while True:
root.update()
await asyncio.sleep(0.05)
def on_closing():
asyncio.get_running_loop().stop()
if __name__ == '__main__':
root = tk.Tk()
root.protocol("WM_DELETE_WINDOW", on_closing)
logs = st.ScrolledText(root, width=30, height=8)
logs.grid()
tkmain = asyncio.ensure_future(tk_main(root))
rdmain = asyncio.ensure_future(redis_main(logs))
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
tkmain.cancel()
rdmain.cancel()