什么时候调用 loop.close() 合适?

When is the right time to call loop.close()?

我已经尝试了 asyncio 一段时间并阅读了 PEPs; a few tutorials; and even the O'Reilly book

我想我掌握了它的窍门,但我仍然对 loop.close() 的行为感到困惑,我不太清楚何时可以“安全”调用它。

最简单的提炼,我的用例是一堆阻塞的“老派”调用,我将其包装在 run_in_executor() 和一个外部协程中;如果这些调用中的任何一个出错,我想停止进程,取消那些仍然未完成的调用,打印一个合理的日志,然后(希望是干净利落地)让开。

比如说,像这样:

import asyncio
import time


def blocking(num):
    time.sleep(num)
    if num == 2:
        raise ValueError("don't like 2")
    return num


async def my_coro(loop, num):
    try:
        result = await loop.run_in_executor(None, blocking, num)
        print(f"Coro {num} done")
        return result
    except asyncio.CancelledError:
        # Do some cleanup here.
        print(f"man, I was canceled: {num}")


def main():
    loop = asyncio.get_event_loop()
    tasks = []
    for num in range(5):
        tasks.append(loop.create_task(my_coro(loop, num)))

    try:
        # No point in waiting; if any of the tasks go wrong, I
        # just want to abandon everything. The ALL_DONE is not
        # a good solution here.
        future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(future)
        if pending:
            print(f"Still {len(pending)} tasks pending")
            # I tried putting a stop() - with/without a run_forever()
            # after the for - same exception raised.
            #  loop.stop()
            for future in pending:
                future.cancel()

        for task in done:
            res = task.result()
            print("Task returned", res)
    except ValueError as error:
        print("Outer except --", error)
    finally:
        # I also tried placing the run_forever() here,
        # before the stop() - no dice.
        loop.stop()
        if pending:
            print("Waiting for pending futures to finish...")
            loop.run_forever()
        loop.close()

我尝试了 stop()run_forever() 调用的几种变体,“run_forever 先,然后停止”似乎是根据 to the pydoc 和,不调用 close() 会产生令人满意的结果:

Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3

Process finished with exit code 0

但是,当添加对 close() 的调用时(如上所示),我得到两个异常:

exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

这充其量是烦人的,但对我来说,完全令人费解:而且,更糟糕的是,我一直无法弄清楚处理这种情况的正确方法。

因此,两个问题:

为了我个人的满意,还有:

非常感谢您提出任何建议!

Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the run_in_executor() and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding

这不能像预期的那样工作,因为 run_in_executor 将函数提交给线程池,并且 OS 线程不能在 Python 中取消(或在其他语言中暴露他们)。取消由 run_in_executor 返回的 future 将尝试取消基础 concurrent.futures.Future,但这只会在阻塞函数尚未 运行ning 时生效,例如因为线程池很忙。一旦开始执行,就无法安全取消。与线程相比,支持安全可靠的取消是使用 asyncio 的好处之一。

如果您正在处理同步代码,无论是传统的阻塞调用还是更长的运行宁CPU绑定代码,您应该运行它与run_in_executor 并结合一种方法来打断它。例如,代码可能偶尔会检查 stop_requested 标志并在为真时退出,可能会引发异常。然后,您可以通过设置适当的标志或标志来 "cancel" 这些任务。

how should I modify the code above in a way that with the call to close() included does not raise?

据我所知,如果不修改 blocking 和顶级代码,目前无法做到这一点。 run_in_executor 会坚持将结果通知事件循环,当事件循环关闭时,这会失败。 asyncio future 被取消也无济于事,因为取消检查是在事件循环线程中执行的,错误发生在这之前,当 call_soon_threadsafe 被工作线程调用时。 (也许可以将检查移到工作线程,但应该仔细分析它是否会导致调用 cancel() 和实际检查之间的竞争条件。)

why does it raise at all? what more does the loop want from the coros/tasks: they either exited; raised; or were canceled: isn't this enough to keep it happy?

它希望传递给 run_in_executor(问题中字面意思是 blocking)的阻塞函数已经开始,在事件循环关闭之前完成 运行ning。您取消了 asyncio future,但底层并发 future 仍然想要 "phone home",发现循环已关闭。

不清楚这是 asyncio 中的错误,还是您根本不应该关闭事件循环,直到您以某种方式确保提交给 run_in_executor 的所有工作都已完成。这样做需要进行以下更改:

  • 不要试图取消挂起的期货。从表面上看,取消它们看起来是正确的,但它会阻止您 wait() 这些期货,因为 asyncio 会认为它们是完整的。
  • 相反,向您的后台任务发送特定于应用程序的事件,通知它们需要中止。
  • loop.close() 之前调用 loop.run_until_complete(asyncio.wait(pending))

通过这些修改(除了特定于应用程序的事件 - 我只是让 sleep() 完成它们的过程),没有出现异常。

what actually happens if I don't call close() - in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?

由于一个典型的事件循环运行s只要应用程序,在程序的最后不调用close()应该没有问题。无论如何,操作系统都会在程序退出时清理资源。

调用loop.close()对于具有明确生命周期的事件循环很重要。例如,一个库可能会为一个特定的任务创建一个新的事件循环,运行 它在一个专用线程中,然后处理它。未能关闭此类循环可能会泄漏其内部资源(例如它用于线程间唤醒的管道)并导致程序失败。另一个例子是测试套件,它通常为每个单元测试启动一个新的事件循环以确保测试环境的分离。


编辑:filed a bug 解决这个问题。
编辑 2: 这个错误是 fixed 开发者。

直到 upstream issue is fixed, another way to work around the problem is by replacing the use of run_in_executor with a custom version without the flaw. While rolling one's own run_in_executor sounds like a bad idea at first, it is in fact only a small glue between a concurrent.futures and an asyncio 未来。

可以使用这两个 类:

的 public API 干净地实现 run_in_executor 的简单版本
def run_in_executor(executor, fn, *args):
    """Submit FN to EXECUTOR and return an asyncio future."""
    loop = asyncio.get_event_loop()
    if args:
        fn = functools.partial(fn, *args)
    work_future = executor.submit(fn)
    aio_future = loop.create_future()
    aio_cancelled = False

    def work_done(_f):
        if not aio_cancelled:
            loop.call_soon_threadsafe(set_result)

    def check_cancel(_f):
        nonlocal aio_cancelled
        if aio_future.cancelled():
            work_future.cancel()
            aio_cancelled = True

    def set_result():
        if work_future.cancelled():
            aio_future.cancel()
        elif work_future.exception() is not None:
            aio_future.set_exception(work_future.exception())
        else:
            aio_future.set_result(work_future.result())

    work_future.add_done_callback(work_done)
    aio_future.add_done_callback(check_cancel)

    return aio_future

loop.run_in_executor(blocking) 替换为 run_in_executor(executor, blocking) 时,executor 是在 main() 中创建的 ThreadPoolExecutor,代码无需其他修改即可工作。

当然,在这个变体中,同步函数将在另一个线程中继续 运行 直到完成,尽管被取消了——但如果不修改它们以支持显式中断,这是不可避免的。