如何在 python asyncio 中等待 select.select 调用
How to await a select.select call in python asyncio
我有一个 python 3.6 程序,我在其中使用 asyncio 包事件循环。我的一个数据源来自 api,它不是围绕 asyncio 构建的。我的连接对象包含一个名为 _connection
的成员,它只是一个 python 套接字。现在我可以在 select 语句中使用它来判断数据何时准备就绪。
async def run(self):
while True:
if select.select([self._q._connection], [], [])[0]:
msg = self._q.receive()
print(msg)
我真正想要的是...
async def run(self):
while True:
if await select.select([self._q._connection], [], [])[0]:
msg = self._q.receive()
print(msg)
我知道 asyncio 事件循环中有一个 sock_recv
函数,但是我需要 api 来进行实际的读取和解码。我试过了,但它只会通过等待,我认为这是有道理的,因为我说的是 0 字节。
async def run(self):
while True:
print('A')
await asyncio.get_event_loop().sock_recv(self._q._connection, 0)
print('B')
msg = self._q.receive()
print(msg)
目前我能想到的唯一解决方案是在 select 中添加一个小超时,然后在没有数据时调用 asyncio.sleep
但这似乎是一种低效的方法。我希望有 asyncio.select
这样的东西。有人想推荐另一种方法吗?
编辑:现在我想出了这个。我不喜欢它,因为它增加了额外的四分之一秒延迟(对于我的应用程序来说可能无关紧要,但它仍然让我烦恼。)
async def run(self):
while True:
if select.select([self._q._connection], [], [], 0)[0]:
print(self._q.receive())
else:
await asyncio.sleep(0.25)
您可以使用 loop.add_reader 等待套接字的读取可用性:
async def watch(fd):
future = asyncio.Future()
loop.add_reader(fd, future.set_result, None)
future.add_done_callback(lambda f: loop.remove_reader(fd))
await future
async def run(self):
while True:
await watch(self._q._connection)
msg = self._q.receive()
print(msg)
但是,要避免您提到的库的所有阻塞 IO 调用而不完全重写它会非常棘手。相反,我建议使用 loop.run_in_executor 方法来安排线程池中的阻塞 IO 调用:
async def run(self):
loop = asyncio.get_event_loop()
while True:
msg = await loop.run_in_executor(None, self._q.receive)
print(msg)
我有一个 python 3.6 程序,我在其中使用 asyncio 包事件循环。我的一个数据源来自 api,它不是围绕 asyncio 构建的。我的连接对象包含一个名为 _connection
的成员,它只是一个 python 套接字。现在我可以在 select 语句中使用它来判断数据何时准备就绪。
async def run(self):
while True:
if select.select([self._q._connection], [], [])[0]:
msg = self._q.receive()
print(msg)
我真正想要的是...
async def run(self):
while True:
if await select.select([self._q._connection], [], [])[0]:
msg = self._q.receive()
print(msg)
我知道 asyncio 事件循环中有一个 sock_recv
函数,但是我需要 api 来进行实际的读取和解码。我试过了,但它只会通过等待,我认为这是有道理的,因为我说的是 0 字节。
async def run(self):
while True:
print('A')
await asyncio.get_event_loop().sock_recv(self._q._connection, 0)
print('B')
msg = self._q.receive()
print(msg)
目前我能想到的唯一解决方案是在 select 中添加一个小超时,然后在没有数据时调用 asyncio.sleep
但这似乎是一种低效的方法。我希望有 asyncio.select
这样的东西。有人想推荐另一种方法吗?
编辑:现在我想出了这个。我不喜欢它,因为它增加了额外的四分之一秒延迟(对于我的应用程序来说可能无关紧要,但它仍然让我烦恼。)
async def run(self):
while True:
if select.select([self._q._connection], [], [], 0)[0]:
print(self._q.receive())
else:
await asyncio.sleep(0.25)
您可以使用 loop.add_reader 等待套接字的读取可用性:
async def watch(fd):
future = asyncio.Future()
loop.add_reader(fd, future.set_result, None)
future.add_done_callback(lambda f: loop.remove_reader(fd))
await future
async def run(self):
while True:
await watch(self._q._connection)
msg = self._q.receive()
print(msg)
但是,要避免您提到的库的所有阻塞 IO 调用而不完全重写它会非常棘手。相反,我建议使用 loop.run_in_executor 方法来安排线程池中的阻塞 IO 调用:
async def run(self):
loop = asyncio.get_event_loop()
while True:
msg = await loop.run_in_executor(None, self._q.receive)
print(msg)