如何用 aiozmq/zmq 正确关闭?

How to close properly with aiozmq/zmq?

当我的后端不是 运行 时,我 运行 遇到了这个问题。
你可以在这里找到一个非常简单的演示来重现我的案例(如果你想节省一些测试时间,你可以将超时减少到 1):

#!/usr/bin/env python3

import threading
import logging
import asyncio
from aiozmq import rpc

logging.getLogger().setLevel(logging.DEBUG)

async def go():
    logging.debug('Go !')
    client = None
    try:
        client = await rpc.connect_rpc(connect='tcp://127.0.0.1:5555', timeout=5)
        await client.call.remote(1, 2)
    except Exception as e:
        logging.exception(e)
    finally:
        if client is not None:
            client.close()
            await client.wait_closed()

    logging.debug('done')

loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()

logging.debug(asyncio.Task.all_tasks())
logging.debug(threading.enumerate())

您会注意到 coro(go 和我们 wait_for)都已完成,而 python 仅跟踪 MainThread
但是您可以在顶部 (htop) 看到您的脚本实际上有 2 个线程 运行.

我不是 100% 确定,但我认为我们收到的 EPOLLOUT|EPOLLERR|EPOLLHUP 是 HUP=HangUp=ConnectionRefused。

我在 zmq 库中找到了 remaining/invisible 个线程。

有人知道发生了什么以及如何正确阻止它吗?

我还在 github 上将此作为 issue 发布并得到了答案。

这实际上是由于Linger period

解决方案就是在调用 connect_rpc 之后添加 client.transport.setsockopt(zmq.LINGER, 0)


如果循环不是 运行,在 finally 关闭的 if 中添加 loop.is_running() 也可能有用,以防止任何试图关闭套接字的尝试不再(例如由于另一个例外)。