如何使用 asyncio add_reader 从套接字读取

How to read from socket using asyncio add_reader

我有这个代码:

import sys
import socket
import asyncio

async def main(dest_addr, max_hops=30, timeout=0.5):
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()

    port = 33434

    rx = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    rx.settimeout(timeout)
    rx.bind(("", port))

    def reader():
        try:
            _, addr = rx.recvfrom(512)
            addr = addr[0]
        except socket.timeout:
            addr = None
        queue.put_nowait(addr)

    loop.add_reader(rx, reader)

    tx = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    for ttl in range(1, max_hops + 1):
        tx.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
        tx.sendto(b"", (dest_addr, port))
        addr = await queue.get()
        print(ttl, addr)

    loop.remove_reader(rx)


if __name__ == "__main__":
    dest_name = sys.argv[1]
    dest_addr = socket.gethostbyname(dest_name)
    print(f"traceroute to {dest_name} ({dest_addr})")
    asyncio.get_event_loop().run_until_complete(main(dest_addr))

我基本上是在尝试使用 asyncio 实现 traceroute。

我正在监视套接字文件描述符的读取可用性,并在使用 socket.sendto 方法后从设备接收数据时调用 reader 并等待队列被填充,然后再去下一步。

但是,我的程序在第一次迭代后挂起,在第二次 addr = await queue.get()

似乎 reader 回调只被调用一次,再也不会被调用,所以队列没有被填满,这很奇怪,因为我在 rx 套接字上有 0.5 秒的超时。

回答我自己的问题:

我认为发生的情况是,设备(例如我的前端路由器)没有任何响应,所以当文件描述符准备好读取时我永远不会收到通知,因此不会调用回调。

解决方法是用超时将 queue.get() 包装在 asyncio.wait_for 内,这样它就不会永远挂起:

for ttl in range(1, max_hops + 1):
    tx.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
    tx.sendto(b"", (dest_addr, port))
    try:
        addr = await asyncio.wait_for(queue.get(), timeout)
    except asyncio.TimeoutError:
        addr = "timeout"
    print(ttl, addr)