如何用 aiozmq/zmq 正确关闭?
How to close properly with aiozmq/zmq?
当我的后端不是 运行 时,我 运行 遇到了这个问题。
你可以在这里找到一个非常简单的演示来重现我的案例(如果你想节省一些测试时间,你可以将超时减少到 1):
#!/usr/bin/env python3
import threading
import logging
import asyncio
from aiozmq import rpc
logging.getLogger().setLevel(logging.DEBUG)
async def go():
logging.debug('Go !')
client = None
try:
client = await rpc.connect_rpc(connect='tcp://127.0.0.1:5555', timeout=5)
await client.call.remote(1, 2)
except Exception as e:
logging.exception(e)
finally:
if client is not None:
client.close()
await client.wait_closed()
logging.debug('done')
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()
logging.debug(asyncio.Task.all_tasks())
logging.debug(threading.enumerate())
您会注意到 coro(go
和我们 wait_for
)都已完成,而 python 仅跟踪 MainThread
。
但是您可以在顶部 (htop
) 看到您的脚本实际上有 2 个线程 运行.
- 1 在
epoll_wait(10,
上被阻止
1 循环:
epoll_wait(13, [], 256, 168) = 0
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 3
fcntl(3, F_SETFD, FD_CLOEXEC) = 0
fcntl(3, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(3, F_SETFL, O_RDWR|O_NONBLOCK) = 0
connect(3, {sa_family=AF_INET, sin_port=htons(5555), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
epoll_ctl(13, EPOLL_CTL_ADD, 3, {0, {u32=1275071824, u64=140476770422096}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 3, {EPOLLOUT, {u32=1275071824, u64=140476770422096}}) = 0
epoll_wait(13, [{EPOLLOUT|EPOLLERR|EPOLLHUP, {u32=1275071824, u64=140476770422096}}], 256, -1) = 1
epoll_ctl(13, EPOLL_CTL_DEL, 3, 0x7fc34c000d54) = 0
getsockopt(3, SOL_SOCKET, SO_ERROR, [111], [4]) = 0
close(3) = 0
我不是 100% 确定,但我认为我们收到的 EPOLLOUT|EPOLLERR|EPOLLHUP
是 HUP=HangUp=ConnectionRefused。
我在 zmq 库中找到了 remaining/invisible 个线程。
有人知道发生了什么以及如何正确阻止它吗?
我还在 github 上将此作为 issue 发布并得到了答案。
这实际上是由于Linger period。
解决方案就是在调用 connect_rpc
之后添加 client.transport.setsockopt(zmq.LINGER, 0)
。
如果循环不是 运行,在 finally
关闭的 if
中添加 loop.is_running()
也可能有用,以防止任何试图关闭套接字的尝试不再(例如由于另一个例外)。
当我的后端不是 运行 时,我 运行 遇到了这个问题。
你可以在这里找到一个非常简单的演示来重现我的案例(如果你想节省一些测试时间,你可以将超时减少到 1):
#!/usr/bin/env python3
import threading
import logging
import asyncio
from aiozmq import rpc
logging.getLogger().setLevel(logging.DEBUG)
async def go():
logging.debug('Go !')
client = None
try:
client = await rpc.connect_rpc(connect='tcp://127.0.0.1:5555', timeout=5)
await client.call.remote(1, 2)
except Exception as e:
logging.exception(e)
finally:
if client is not None:
client.close()
await client.wait_closed()
logging.debug('done')
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
loop.close()
logging.debug(asyncio.Task.all_tasks())
logging.debug(threading.enumerate())
您会注意到 coro(go
和我们 wait_for
)都已完成,而 python 仅跟踪 MainThread
。
但是您可以在顶部 (htop
) 看到您的脚本实际上有 2 个线程 运行.
- 1 在
epoll_wait(10,
上被阻止
1 循环:
epoll_wait(13, [], 256, 168) = 0 socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 3 fcntl(3, F_SETFD, FD_CLOEXEC) = 0 fcntl(3, F_GETFL) = 0x2 (flags O_RDWR) fcntl(3, F_SETFL, O_RDWR|O_NONBLOCK) = 0 connect(3, {sa_family=AF_INET, sin_port=htons(5555), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress) epoll_ctl(13, EPOLL_CTL_ADD, 3, {0, {u32=1275071824, u64=140476770422096}}) = 0 epoll_ctl(13, EPOLL_CTL_MOD, 3, {EPOLLOUT, {u32=1275071824, u64=140476770422096}}) = 0 epoll_wait(13, [{EPOLLOUT|EPOLLERR|EPOLLHUP, {u32=1275071824, u64=140476770422096}}], 256, -1) = 1 epoll_ctl(13, EPOLL_CTL_DEL, 3, 0x7fc34c000d54) = 0 getsockopt(3, SOL_SOCKET, SO_ERROR, [111], [4]) = 0 close(3) = 0
我不是 100% 确定,但我认为我们收到的 EPOLLOUT|EPOLLERR|EPOLLHUP
是 HUP=HangUp=ConnectionRefused。
我在 zmq 库中找到了 remaining/invisible 个线程。
有人知道发生了什么以及如何正确阻止它吗?
我还在 github 上将此作为 issue 发布并得到了答案。
这实际上是由于Linger period。
解决方案就是在调用 connect_rpc
之后添加 client.transport.setsockopt(zmq.LINGER, 0)
。
如果循环不是 运行,在 finally
关闭的 if
中添加 loop.is_running()
也可能有用,以防止任何试图关闭套接字的尝试不再(例如由于另一个例外)。