asyncio 中的事件循环溢出。虽然一次添加 4 次执行,但它正在超载

Event loop in asyncio is overflowing. Though adding 4 executions at a time it is overloading

在下面的代码中,我为单个循环调用了 getSUEPEvent() 函数 4 次。我为接下来的 4 再次重新启动循环。仍然执行继续添加到循环中。如果循环是全局的,那么任何人都可以提出另一种策略来通过分组或任何其他方式运行 n 次。

def SEUPCustomers(featurecode,threshholdTime):
    # headers = buildHeaders()
    with open("ActiveCustomers.csv","r") as f:
        SEUPCustomersList = []
        csvReader = csv.reader(f)
        tasks = []
        for row in csvReader:
            tasks.append(asyncio.ensure_future(getSEUPEvents(featurecode,row,threshholdTime,SEUPCustomersList)))

        for task in range(0,len(tasks),4):
            loop = asyncio.get_event_loop()
            loop.run_until_complete(asyncio.wait(tasks[task:task+4]))
            loop.close()

ensure_futurerun_until_complete 没有按照您期望的方式工作。这是他们的工作:

  • ensure_future 在主循环中将 awaitable 调度到 运行,有效地创建了可以称为 "background task" 的东西,只要 运行主循环 运行s;

  • run_until_complete 将给定的等待对象提交给事件循环,然后 运行 事件循环 直到特定的未来完成。

因此,如果您向事件循环提交 100 个任务,然后使用 run_until_complete 等待其中一个任务完成,则循环将 运行 所有 100 个任务,并在完成任务后停止完成你感兴趣的完成。

要编写您想要的代码,您可以简单地避免 ensure_future 步骤:

def SEUPCustomers(featurecode,threshholdTime):
    # headers = buildHeaders()
    with open("ActiveCustomers.csv","r") as f:
        SEUPCustomersList = []
        csvReader = csv.reader(f)
        coros = []
        for row in csvReader:
            coros.append(getSEUPEvents(featurecode,row,threshholdTime,SEUPCustomersList))

        loop = asyncio.get_event_loop()
        for i in range(0,len(coros),4):
            loop.run_until_complete(asyncio.wait(coros[i:i+4]))

此外,如果您打算稍后使用循环,loop.close() 是不正确的。如果您完全调用 loop.close(),则应该在完全完成事件循环后调用它。