asyncio - 启动无限 loop.create_connection()
asyncio - launching infinite loop.create_connection()
我一直在尝试创建一个无限的客户端实例链,但没有成功。
我正在开发一个 asyncio 应用程序,这个应用程序还有很多其他东西,比如 运行 一个带有 loop.create_server() 的服务器,需要每 10 秒连接到一个服务器列表,发送一些数据,然后断开连接。
我不断收到 2 个错误:"runtimeError: Event loop is running." 或 "asyncio > Task was destroyed but it is pending!"
下面的代码有效。
import asyncio
from tcp.client import Client
def send_to_peers(data):
for index in range(1, 3): #this for-loop just for simulating a list of peers
try:
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: Client(), '127.0.0.1', 10000 + index)
_, proto = loop.run_until_complete(coro)
msg = data + "-" + str(index) + "\n"
proto.transport.write(str.encode(msg))
proto.transport.close()
except ConnectionRefusedError as exc:
print(exc)
def infinite():
for index in range(5): #again this should be a While True:
#there should be here an asyncio.sleep(10)
send_to_peers(str(index))
infinite()
但是当我从 main_loop 调用它时,事情开始中断。
async def infinite_loop():
for index in range(5):
print("loop n " + str(index))
task = asyncio.Task(send_to_peers(str(index)))
await asyncio.sleep(10)
task.cancel()
with suppress(asyncio.CancelledError):
await task
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(infinite_loop())
main_loop.run_forever()
我试过把 main_loop 给 send_to_peers ,把它给客户端(循环) class,我试着弯腰并关闭循环,删除任务,使用 ensure_future 的奇怪组合,但没有任何效果。
我尽可能多地用谷歌搜索,我读到嵌套无限循环是不好的,但我没有找到任何其他方法。
我最后的希望是使用线程,但即使我认为它可行,它也不是一个优雅的解决方案,也不是正确的解决方案。
我习惯于使用 Node,所以如果我犯了一个愚蠢的错误,请原谅,我认为 2 周后我可以做到,但我在这里。
如果有任何帮助,我将不胜感激。我卡住了。谢谢!
PS:
Client() class 非常基础:
import asyncio
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s > %(message)s',
stream=sys.stderr
)
class Client(asyncio.Protocol):
def __init__(self):
self.log = logging.getLogger('client')
self.address = None
self.transport = None
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug('{}:{} connected'.format(*self.address))
def data_received(self, data):
self.log.debug('{}:{} just sent {!r}'.format(*self.address, data))
def eof_received(self):
self.log.debug('{}:{} sent EOF'.format(*self.address))
def connection_lost(self, error=""):
self.log.debug('{}:{} disconnected'.format(*self.address))
self.transport.close()
I keep getting 2 errors: "runtimeError: Event loop is running." or "asyncio > Task was destroyed but it is pending!"
如您所见,asyncio 事件循环 do not nest。
要删除嵌套,您应该使用 async def
将 send_to_peers
定义为协程。其中 loop.run_until_complete(coro)
应更改为 await coro
。一旦 send_to_peers
是协程,您就可以调用它:
使用loop.run_until_complete(send_to_peers(...))
阻止infinite
等代码
来自异步代码,例如 infinite_loop
使用 await send_to_peers(...)
。
在 infinite_loop
的情况下,您可以使用 asyncio.wait_for
:
实现超时
try:
await asyncio.wait_for(send_to_peers(str(index)), 10)
except asyncio.TimeoutError:
# ... timeout ...
我一直在尝试创建一个无限的客户端实例链,但没有成功。
我正在开发一个 asyncio 应用程序,这个应用程序还有很多其他东西,比如 运行 一个带有 loop.create_server() 的服务器,需要每 10 秒连接到一个服务器列表,发送一些数据,然后断开连接。
我不断收到 2 个错误:"runtimeError: Event loop is running." 或 "asyncio > Task was destroyed but it is pending!"
下面的代码有效。
import asyncio
from tcp.client import Client
def send_to_peers(data):
for index in range(1, 3): #this for-loop just for simulating a list of peers
try:
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: Client(), '127.0.0.1', 10000 + index)
_, proto = loop.run_until_complete(coro)
msg = data + "-" + str(index) + "\n"
proto.transport.write(str.encode(msg))
proto.transport.close()
except ConnectionRefusedError as exc:
print(exc)
def infinite():
for index in range(5): #again this should be a While True:
#there should be here an asyncio.sleep(10)
send_to_peers(str(index))
infinite()
但是当我从 main_loop 调用它时,事情开始中断。
async def infinite_loop():
for index in range(5):
print("loop n " + str(index))
task = asyncio.Task(send_to_peers(str(index)))
await asyncio.sleep(10)
task.cancel()
with suppress(asyncio.CancelledError):
await task
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(infinite_loop())
main_loop.run_forever()
我试过把 main_loop 给 send_to_peers ,把它给客户端(循环) class,我试着弯腰并关闭循环,删除任务,使用 ensure_future 的奇怪组合,但没有任何效果。
我尽可能多地用谷歌搜索,我读到嵌套无限循环是不好的,但我没有找到任何其他方法。
我最后的希望是使用线程,但即使我认为它可行,它也不是一个优雅的解决方案,也不是正确的解决方案。
我习惯于使用 Node,所以如果我犯了一个愚蠢的错误,请原谅,我认为 2 周后我可以做到,但我在这里。
如果有任何帮助,我将不胜感激。我卡住了。谢谢!
PS: Client() class 非常基础:
import asyncio
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s > %(message)s',
stream=sys.stderr
)
class Client(asyncio.Protocol):
def __init__(self):
self.log = logging.getLogger('client')
self.address = None
self.transport = None
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug('{}:{} connected'.format(*self.address))
def data_received(self, data):
self.log.debug('{}:{} just sent {!r}'.format(*self.address, data))
def eof_received(self):
self.log.debug('{}:{} sent EOF'.format(*self.address))
def connection_lost(self, error=""):
self.log.debug('{}:{} disconnected'.format(*self.address))
self.transport.close()
I keep getting 2 errors: "runtimeError: Event loop is running." or "asyncio > Task was destroyed but it is pending!"
如您所见,asyncio 事件循环 do not nest。
要删除嵌套,您应该使用 async def
将 send_to_peers
定义为协程。其中 loop.run_until_complete(coro)
应更改为 await coro
。一旦 send_to_peers
是协程,您就可以调用它:
使用
loop.run_until_complete(send_to_peers(...))
阻止来自异步代码,例如
infinite_loop
使用await send_to_peers(...)
。
infinite
等代码
在 infinite_loop
的情况下,您可以使用 asyncio.wait_for
:
try:
await asyncio.wait_for(send_to_peers(str(index)), 10)
except asyncio.TimeoutError:
# ... timeout ...