Python 的 asyncio 中如何使用 Future 对象?
How is the Future object used in Python's asyncio?
我了解事件循环的基本概念。有一个中央循环监听一组文件描述符,如果准备好读或写,则执行相应的回调。
我们可以使用协程而不是回调,因为它们可以暂停和恢复。然而,这意味着在协程和事件循环之间应该有某种通信协议才能正常工作?
我写了一个带有协程的简单回声服务器,它会产生 fd 以及像这样 yield fd, 'read'
、yield fd, 'write'
等感兴趣的动作,然后事件循环会注册 select
相应地。回调只是恢复协程。它工作正常,我添加了下面的代码。
现在我只是想了解 await
的实际工作原理。它似乎不像我的示例代码那样产生 fds 和相应的操作,相反,它给你一个 Future
对象。那么幕后到底发生了什么?它如何与事件循环通信?
我的猜测是await async.sleep(1)
会被这样执行:
- 事件循环将执行协程并到达
async.sleep(1)
。
- 它将创建一个
Future
对象。
- 然后它将创建一个 fd,可能使用
timerfd_create
和回调来完成 Future
。
- 然后会提交给Event Loop进行监控。
await
会将 Future
对象生成给正在执行它的事件循环。
- 事件循环会将
Future
对象的回调函数设置为恢复协程。
我的意思是我可以像这样使用 Future
。但这是实际发生的事情吗?有人可以帮助我更好地理解这一点吗?
PS: timerfd_create
只是作为一个例子,因为我不明白如何在事件循环中实现定时器。出于这个问题的目的,网络 fds 也是 fin 。如果有人可以帮助我了解计时器的实现方式,那就太好了!
这是我使用协程实现的简单 Echo 服务器:
"""
Tasks are just generators or coroutines
"""
import socket
import selectors
select = selectors.DefaultSelector()
tasks_to_complete = []
def create_server(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
hostname = socket.gethostname()
s.bind((hostname, port))
s.listen(5)
print("Starting server on hostname at port %s %s" % (hostname, port))
return s
def handle_clients(s):
while True:
print("yielding for read on server %s" % id(s))
yield (s, 'read')
c, a = s.accept()
t = handle_client(c)
print("appending a client handler")
tasks_to_complete.append(t)
def handle_client(c):
while True:
print("yielding for read client %s" % id(c))
yield (c, 'read')
data = c.recv(1024)
if len(data) == 0:
return "Connection Closed"
print("yielding for write on client %s" % id(c))
yield (c, 'write')
c.send(bytes(data))
def run(tasks_to_complete):
while True:
while tasks_to_complete:
t = tasks_to_complete.pop(0)
try:
fd, event = t.send(None)
if event == 'read':
event = selectors.EVENT_READ
elif event == 'write':
event = selectors.EVENT_WRITE
def context_callback(fd, t):
def callback():
select.unregister(fd)
tasks_to_complete.append(t)
return callback
select.register(fd, event, context_callback(fd, t))
except StopIteration as e:
print(e.value)
events = select.select()
for key, mask in events:
callback = key.data
callback()
tasks_to_complete.append(handle_clients(create_server(9000)))
run(tasks_to_complete)
I wrote a simple Echo Server with co-routines, which would yield the fd along with the interested action like this yield fd, 'read'
, yield fd, 'write'
etc and then the Event Loop would register the select
accordingly.
这类似于 Dave Beazley curio works. To learn more about the concept, see this lecture 从最基础的地方构建事件循环的方式。 (他使用 3.5 之前的 yield from
语法,但它的工作原理与 await
完全相同。)
如您所见,asyncio 的工作原理略有不同,但原理仍然相似。
Now I am just trying to understand how await
actually works. It doesn't seem to yield fds, and string corresponding to the action like the above example, instead it gives you a Future
object. So what exactly happens under-the-hood? How is it communicating with the Event Loop?
简短的版本是阻塞协程使用一个全局变量(通过asyncio.get_event_loop()
)来获取事件循环。事件循环有一些方法可以安排在有趣的事件发生时调用回调。 asyncio.sleep
calls loop.call_later
以确保在超时结束时恢复。
yield 的 Future
只是事件循环准备好后通知结果的一种便捷方式,以便它可以正确恢复 Task
(协程驱动事件循环)等待阻塞操作,同时还处理异常和取消。参见 Task.__step
for the gory details。
timerfd_create
was just taken as an example because I couldn't understand how timers can be implemented in an Event Loop.
计时器已实现,以便事件循环跟踪文件描述符和超时,并且 issues a select
that terminates when the earliest timeout 已过去。上面链接的 Dave 的讲座简洁地演示了这个概念。
我了解事件循环的基本概念。有一个中央循环监听一组文件描述符,如果准备好读或写,则执行相应的回调。
我们可以使用协程而不是回调,因为它们可以暂停和恢复。然而,这意味着在协程和事件循环之间应该有某种通信协议才能正常工作?
我写了一个带有协程的简单回声服务器,它会产生 fd 以及像这样 yield fd, 'read'
、yield fd, 'write'
等感兴趣的动作,然后事件循环会注册 select
相应地。回调只是恢复协程。它工作正常,我添加了下面的代码。
现在我只是想了解 await
的实际工作原理。它似乎不像我的示例代码那样产生 fds 和相应的操作,相反,它给你一个 Future
对象。那么幕后到底发生了什么?它如何与事件循环通信?
我的猜测是await async.sleep(1)
会被这样执行:
- 事件循环将执行协程并到达
async.sleep(1)
。 - 它将创建一个
Future
对象。 - 然后它将创建一个 fd,可能使用
timerfd_create
和回调来完成Future
。 - 然后会提交给Event Loop进行监控。
await
会将Future
对象生成给正在执行它的事件循环。- 事件循环会将
Future
对象的回调函数设置为恢复协程。
我的意思是我可以像这样使用 Future
。但这是实际发生的事情吗?有人可以帮助我更好地理解这一点吗?
PS: timerfd_create
只是作为一个例子,因为我不明白如何在事件循环中实现定时器。出于这个问题的目的,网络 fds 也是 fin 。如果有人可以帮助我了解计时器的实现方式,那就太好了!
这是我使用协程实现的简单 Echo 服务器:
"""
Tasks are just generators or coroutines
"""
import socket
import selectors
select = selectors.DefaultSelector()
tasks_to_complete = []
def create_server(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
hostname = socket.gethostname()
s.bind((hostname, port))
s.listen(5)
print("Starting server on hostname at port %s %s" % (hostname, port))
return s
def handle_clients(s):
while True:
print("yielding for read on server %s" % id(s))
yield (s, 'read')
c, a = s.accept()
t = handle_client(c)
print("appending a client handler")
tasks_to_complete.append(t)
def handle_client(c):
while True:
print("yielding for read client %s" % id(c))
yield (c, 'read')
data = c.recv(1024)
if len(data) == 0:
return "Connection Closed"
print("yielding for write on client %s" % id(c))
yield (c, 'write')
c.send(bytes(data))
def run(tasks_to_complete):
while True:
while tasks_to_complete:
t = tasks_to_complete.pop(0)
try:
fd, event = t.send(None)
if event == 'read':
event = selectors.EVENT_READ
elif event == 'write':
event = selectors.EVENT_WRITE
def context_callback(fd, t):
def callback():
select.unregister(fd)
tasks_to_complete.append(t)
return callback
select.register(fd, event, context_callback(fd, t))
except StopIteration as e:
print(e.value)
events = select.select()
for key, mask in events:
callback = key.data
callback()
tasks_to_complete.append(handle_clients(create_server(9000)))
run(tasks_to_complete)
I wrote a simple Echo Server with co-routines, which would yield the fd along with the interested action like this
yield fd, 'read'
,yield fd, 'write'
etc and then the Event Loop would register theselect
accordingly.
这类似于 Dave Beazley curio works. To learn more about the concept, see this lecture 从最基础的地方构建事件循环的方式。 (他使用 3.5 之前的 yield from
语法,但它的工作原理与 await
完全相同。)
如您所见,asyncio 的工作原理略有不同,但原理仍然相似。
Now I am just trying to understand how
await
actually works. It doesn't seem to yield fds, and string corresponding to the action like the above example, instead it gives you aFuture
object. So what exactly happens under-the-hood? How is it communicating with the Event Loop?
简短的版本是阻塞协程使用一个全局变量(通过asyncio.get_event_loop()
)来获取事件循环。事件循环有一些方法可以安排在有趣的事件发生时调用回调。 asyncio.sleep
calls loop.call_later
以确保在超时结束时恢复。
yield 的 Future
只是事件循环准备好后通知结果的一种便捷方式,以便它可以正确恢复 Task
(协程驱动事件循环)等待阻塞操作,同时还处理异常和取消。参见 Task.__step
for the gory details。
timerfd_create
was just taken as an example because I couldn't understand how timers can be implemented in an Event Loop.
计时器已实现,以便事件循环跟踪文件描述符和超时,并且 issues a select
that terminates when the earliest timeout 已过去。上面链接的 Dave 的讲座简洁地演示了这个概念。