如何在集合中收集 wait() 的协程?

How to collect wait()'d co-routines in a set?

我一直在尝试生成使用有限数量进程的 ping 扫描。我尝试了 as_completed 但没有成功,然后切换到 asyncio.waitasyncio.FIRST_COMPLETED

如果有问题的行被注释掉,下面的完整脚本就可以工作。我想将任务收集到一个集合中以摆脱 pending = list(pending) 但是 pending_set.union(task) 抛出 await wasn't used with future.

"""Test simultaneous pings, limiting processes."""
import asyncio
from time import asctime

pinglist = [
    '127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
    '192.168.177.20', '192.168.177.100', '172.17.1.1'
]


async def ping(ip):
    """Run external ping."""
    p = await asyncio.create_subprocess_exec(
        'ping', '-n', '-c', '1', ip,
        stdout=asyncio.subprocess.DEVNULL,
        stderr=asyncio.subprocess.DEVNULL
    )
    return await p.wait()


async def run():
    """Run the test, uses some processes and will take a while."""
    iplist = pinglist[:]
    pending = []
    pending_set = set()
    tasks = {}
    while len(pending) or len(iplist):
        while len(pending) < 3 and len(iplist):
            ip = iplist.pop()
            print(f"{asctime()} adding {ip}")
            task = asyncio.create_task(ping(ip))
            tasks[task] = ip
            pending.append(task)
            pending_set.union(task)  # comment this line and no error
        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        pending = list(pending)
        for taskdone in done:
            print(' '.join([
                asctime(),
                ('BAD' if taskdone.result() else 'good'),
                tasks[taskdone]
            ]))

if __name__ == '__main__':
    asyncio.run(run())

使用await asyncio.gather(*pending_set)

  • asyncio.gather() 接受任意数量的 awaitables 以及 returns 一个
  • * 解压集合
    >>> "{} {} {}".format(*set((1,2,3)))
    '1 2 3'
    

示例来自 docs

await asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
)

pending_set.union(task)有两个问题:

  • union 不会就地更新集合,它 returns 一个 新集合 由原始集合和它接收的参数。

  • 它接受一个可迭代的集合(比如另一个集合),而不是单个元素。因此 union 尝试迭代 task,这是没有意义的。更令人困惑的是,任务对象在技术上是可迭代的,以便在 yield from 表达式中使用,但它们会检测非异步上下文中的迭代尝试,并报告您观察到的错误。

要解决这两个问题,您应该改用 add 方法,该方法会产生副作用并接受要添加到集合中的单个元素:

pending_set.add(task)

请注意,在 asyncio 中限制并发的更惯用的方法是使用 Semaphore。例如(未经测试):

async def run():
    limit = asyncio.Semaphore(3)
    async def wait_and_ping(ip):
        async with limit:
            print(f"{asctime()} adding {ip}")
            result = await ping(ip)
        print(asctime(), ip, ('BAD' if result else 'good'))
    await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])

我解决了这个问题,没有在我的原始应用程序中对 ping 目标进行排队,这简化了事情。这个答案包括一个逐渐收到的目标列表和来自 @user4815162342 的有用指针。这就完成了对原始问题的回答。

import asyncio
import time

pinglist = ['127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
    '192.168.177.20', '192.168.177.100', '172.17.1.1']

async def worker(queue):
    limit = asyncio.Semaphore(4)    # restrict the rate of work

    async def ping(ip):
        """Run external ping."""
        async with limit:
            print(f"{time.time():.2f} starting {ip}")
            p = await asyncio.create_subprocess_exec(
                'ping', '-n', '1', ip,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL
            )
            return (ip, await p.wait())

    async def get_assign():
        return await queue.get()

    assign = {asyncio.create_task(get_assign())}
    pending = set()

维护两个不同的待定集被证明是关键。一组是接收分配地址的单个任务。这完成并需要每次重新启动。另一组用于 运行 一次然后完成的 ping 消息。

    while len(assign) + len(pending) > 0:  # stop condition
        done, pending = await asyncio.wait(
            set().union(assign, pending),
            return_when=asyncio.FIRST_COMPLETED
        )
        for job in done:
            if job in assign:
                if job.result() is None:
                    assign = set()  # for stop condition
                else:
                    pending.add(asyncio.create_task(ping(job.result())))
                    assign = {asyncio.create_task(get_assign())}
            else:
                print(
                    f"{time.time():.2f} result {job.result()[0]}"
                    f" {['good', 'BAD'][job.result()[1]]}"
                )

剩下的就很简单了。

async def assign(queue):
    """Assign tasks as if these are arriving gradually."""
    print(f"{time.time():.2f} start assigning")
    for task in pinglist:
        await queue.put(task)
        await asyncio.sleep(0.1)
    await queue.put(None)           # to stop nicely

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(worker(queue), assign(queue))

if __name__ == '__main__':
    asyncio.run(main())

这个输出是(在我的网络上 172 没有响应):

1631611141.70 start assigning
1631611141.70 starting 127.0.0.1
1631611141.71 result 127.0.0.1 good
1631611141.80 starting 192.168.1.10
1631611141.81 result 192.168.1.10 good
1631611141.91 starting 192.168.1.20
1631611142.02 starting 192.168.1.254
1631611142.03 result 192.168.1.254 good
1631611142.13 starting 192.168.177.20
1631611142.23 starting 192.168.177.100
1631611142.24 result 192.168.177.100 good
1631611142.34 starting 172.17.1.1
1631611144.47 result 192.168.1.20 good
1631611145.11 result 192.168.177.20 good
1631611145.97 result 172.17.1.1 BAD