如何正确打开和关闭 for 循环中的事件循环?
How to properly open and close an event-loop in a for loop?
我有一个嵌套在 wallet_loop
中的异步循环 watch_book
和 main
我想在 for
循环中执行。这样做的正确方法是什么?
我的问题是,在第一个帐户终止它的循环并关闭它之后,第二个帐户创建了一个新循环,但它仍然抱怨循环已关闭。这是什么原因?
async def watch_book(account, client, market):
while True:
ob = await client.watch_order_book(market.symbol)
if stop():
break
await client.sleep(1000)
async def wallet_loop(account, loop, wallet):
client = getattr(name, exid)({'enableRateLimit': True, 'asyncio_loop': loop, })
do_more_stuff()
ws_loops = [watch_book(account, client, market) for market in markets]
await asyncio.gather(*ws_loops)
await client.close()
async def main(account, loop, wallets):
wallet_loops = [wallet_loop(account, loop, wallet) for wallet in wallets]
await asyncio.gather(*wallet_loops)
for account in accounts:
wallets = get_wallets()
loop = asyncio.get_event_loop()
if loop.is_closed():
log.info('Create a new loop')
loop = asyncio.new_event_loop()
gp = asyncio.wait([main(account, loop, wallets)])
loop.run_until_complete(gp)
loop.close()
我得到的错误:
2021-07-02 11:45:54.459777 [info ] Create a new loop
Exception ignored in: <coroutine object throttle.<locals>.run at 0x7f80ce1492b0>
Traceback (most recent call last):
File "/home/aaa/env/lib/python3.6/site-packages/name/async_support/base/throttle.py", line 51, in run
future.set_exception(excp)
File "/usr/lib/python3.6/asyncio/base_events.py", line 591, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 377, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
我想规范的做法是将您的程序放在异步函数中,然后 运行 将其全部放在事件循环中:
async def watch_book(account, client, market):
while True:
ob = await client.watch_order_book(market.symbol)
if stop():
break
await client.sleep(1000)
async def wallet_loop(account, wallet):
client = getattr(name, exid)({'enableRateLimit': True, 'asyncio_loop': asyncio.get_running_loop()})
do_more_stuff()
ws_loops = [watch_book(account, client, market) for market in markets]
await asyncio.gather(*ws_loops)
await client.close()
async def main(account, wallets):
wallet_loops = [wallet_loop(account, wallet) for wallet in wallets]
await asyncio.gather(*wallet_loops)
async def main_orchestrator(accounts):
for account in accounts:
wallets = get_wallets()
gp = await main(account, wallets)
asyncio.run(main_orchestrator(accounts))
而且您甚至可以轻松并行化所有帐户的处理
如果将函数更改为:
async def main(accounts):
await asyncio.gather(*(main(account, wallets) for account in accounts))
如果出于某种原因你真的需要每个批次在一个循环中 运行,尽管如此,只需将 loop = asyncio.get_event_loop()
替换为 loop = asyncio.new_event_loop()
。
我有一个嵌套在 wallet_loop
中的异步循环 watch_book
和 main
我想在 for
循环中执行。这样做的正确方法是什么?
我的问题是,在第一个帐户终止它的循环并关闭它之后,第二个帐户创建了一个新循环,但它仍然抱怨循环已关闭。这是什么原因?
async def watch_book(account, client, market):
while True:
ob = await client.watch_order_book(market.symbol)
if stop():
break
await client.sleep(1000)
async def wallet_loop(account, loop, wallet):
client = getattr(name, exid)({'enableRateLimit': True, 'asyncio_loop': loop, })
do_more_stuff()
ws_loops = [watch_book(account, client, market) for market in markets]
await asyncio.gather(*ws_loops)
await client.close()
async def main(account, loop, wallets):
wallet_loops = [wallet_loop(account, loop, wallet) for wallet in wallets]
await asyncio.gather(*wallet_loops)
for account in accounts:
wallets = get_wallets()
loop = asyncio.get_event_loop()
if loop.is_closed():
log.info('Create a new loop')
loop = asyncio.new_event_loop()
gp = asyncio.wait([main(account, loop, wallets)])
loop.run_until_complete(gp)
loop.close()
我得到的错误:
2021-07-02 11:45:54.459777 [info ] Create a new loop
Exception ignored in: <coroutine object throttle.<locals>.run at 0x7f80ce1492b0>
Traceback (most recent call last):
File "/home/aaa/env/lib/python3.6/site-packages/name/async_support/base/throttle.py", line 51, in run
future.set_exception(excp)
File "/usr/lib/python3.6/asyncio/base_events.py", line 591, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 377, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
我想规范的做法是将您的程序放在异步函数中,然后 运行 将其全部放在事件循环中:
async def watch_book(account, client, market):
while True:
ob = await client.watch_order_book(market.symbol)
if stop():
break
await client.sleep(1000)
async def wallet_loop(account, wallet):
client = getattr(name, exid)({'enableRateLimit': True, 'asyncio_loop': asyncio.get_running_loop()})
do_more_stuff()
ws_loops = [watch_book(account, client, market) for market in markets]
await asyncio.gather(*ws_loops)
await client.close()
async def main(account, wallets):
wallet_loops = [wallet_loop(account, wallet) for wallet in wallets]
await asyncio.gather(*wallet_loops)
async def main_orchestrator(accounts):
for account in accounts:
wallets = get_wallets()
gp = await main(account, wallets)
asyncio.run(main_orchestrator(accounts))
而且您甚至可以轻松并行化所有帐户的处理 如果将函数更改为:
async def main(accounts):
await asyncio.gather(*(main(account, wallets) for account in accounts))
如果出于某种原因你真的需要每个批次在一个循环中 运行,尽管如此,只需将 loop = asyncio.get_event_loop()
替换为 loop = asyncio.new_event_loop()
。