尝试使用 run_forever 创建一个永远运行的异步函数
Trying to make an async function that runs forever using run_forever
我正在尝试访问一个 websocket 来收听一些信息,所以我想使用 run_forever 命令继续收听直到我停止程序,但是当我这样做时我得到下面的错误,可以有人告诉我我做错了什么吗?
代码:
loop = asyncio.get_event_loop()
async def listen():
url = "websocket url starting with wss"
async with websockets.connect(url) as ws:
msg = await ws.recv()
print(msg)
loop.run_forever(listen())
错误然后说:
RuntimeWarning: coroutine 'listen' was never awaited
loop.run_forever(listen())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
File "C:\Users\...", line 18, in <module>
loop.run_forever(listen())
TypeError: run_forever() takes 1 positional argument but 2 were given
我尝试将您的脚本加载到 Python 3.9.6 中,但我看到了您的错误。
这就是我得到它的方式 运行(您需要输入正确的 URL):
import asyncio, websockets
async def listen():
url = "websocket url starting with wss"
async with websockets.connect(url) as ws:
msg = await ws.recv()
print(msg)
async def main():
loop = asyncio.get_event_loop()
loop.run_forever(await listen())
if __name__ == "__main__":
asyncio.run(main())
我希望它能帮助你进一步提高你的计划。
我正在尝试访问一个 websocket 来收听一些信息,所以我想使用 run_forever 命令继续收听直到我停止程序,但是当我这样做时我得到下面的错误,可以有人告诉我我做错了什么吗?
代码:
loop = asyncio.get_event_loop()
async def listen():
url = "websocket url starting with wss"
async with websockets.connect(url) as ws:
msg = await ws.recv()
print(msg)
loop.run_forever(listen())
错误然后说:
RuntimeWarning: coroutine 'listen' was never awaited
loop.run_forever(listen())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
File "C:\Users\...", line 18, in <module>
loop.run_forever(listen())
TypeError: run_forever() takes 1 positional argument but 2 were given
我尝试将您的脚本加载到 Python 3.9.6 中,但我看到了您的错误。
这就是我得到它的方式 运行(您需要输入正确的 URL):
import asyncio, websockets
async def listen():
url = "websocket url starting with wss"
async with websockets.connect(url) as ws:
msg = await ws.recv()
print(msg)
async def main():
loop = asyncio.get_event_loop()
loop.run_forever(await listen())
if __name__ == "__main__":
asyncio.run(main())
我希望它能帮助你进一步提高你的计划。