如何检查异步循环是否有任何关联的套接字

how to check if asyncio loop has any associated sockets

asyncio.Task.all_tasks() 给出了事件循环的所有任务列表,但我找不到任何类似的套接字,特别是与循环关联的数据报套接字?

套接字和任务的缺失可能会发出 "end of life" 循环信号。

问题是,在下面的示例中,当任务集为空时,在 loop_not_empty() 中放入什么使其成为 return False 没有关联的套接字(即两秒后)

示例:

import asyncio
import socket
import threading

class Handler(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print("connection made")

    def datagram_received(self, data, addr):
        if data == b'die':
            print("shutting down")
            self.transport.abort()

@asyncio.coroutine
def sometask():
    yield from asyncio.sleep(1)
    print("task done")

def loop_not_empty(l):
    # if asyncio.Task.all_tasks() == set() and WHAT_GOES_HERE
    #   return False
    return True

def main():
    a,b = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)

    l = asyncio.get_event_loop()

    asyncio.ensure_future(sometask(), loop=l)
    asyncio.ensure_future(l.create_datagram_endpoint(Handler, sock=a), loop=l)

    threading.Timer(2, lambda: b.send(b'die')).start()

    while loop_not_empty(l):
        l.run_until_complete(asyncio.sleep(1, loop=l))

main()

这是一个解决方案,它使用简单的 class 和 asyncio.Event() 来计算活动作业的数量,并在所有作业完成后发出停止循环的信号:

import asyncio

import random


class UseCounter:
    def __init__(self, loop=None):
        self.loop = loop
        self.event = asyncio.Event(loop=loop)
        self.n = 0  # The number of active jobs

    def __enter__(self):
        self.enter()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.exit()

    def enter(self):
        self.n += 1

    def exit(self):
        self.n -= 1
        if self.n == 0:
            self.event.set()

    async def wait(self):
        return await self.event.wait()


async def my_coroutine(counter, term):
    with counter:
        print("start", term)
        n = random.uniform(0.2, 1.5)
        await asyncio.sleep(n)
        print("end", term)


loop = asyncio.get_event_loop()
counter = UseCounter(loop)
terms = ["apple", "banana", "melon"]
for term in terms:
    asyncio.ensure_future(my_coroutine(counter, term))

loop.run_until_complete(counter.wait())

loop.close()

对于上面的示例,将 .enter() 添加到 connection_made() 并将 .exit() 添加到 connection_lost()