如何正确打开和关闭 for 循环中的事件循环?

How to properly open and close an event-loop in a for loop?

我有一个嵌套在 wallet_loop 中的异步循环 watch_bookmain 我想在 for 循环中执行。这样做的正确方法是什么?

我的问题是,在第一个帐户终止它的循环并关闭它之后,第二个帐户创建了一个新循环,但它仍然抱怨循环已关闭。这是什么原因?

async def watch_book(account, client, market):
    while True:
        ob = await client.watch_order_book(market.symbol)
        if stop():
            break
        await client.sleep(1000)

async def wallet_loop(account, loop, wallet):
    client = getattr(name, exid)({'enableRateLimit': True, 'asyncio_loop': loop, })
    do_more_stuff()
    ws_loops = [watch_book(account, client, market) for market in markets]
    await asyncio.gather(*ws_loops)
    await client.close()


async def main(account, loop, wallets):
    wallet_loops = [wallet_loop(account, loop, wallet) for wallet in wallets]
    await asyncio.gather(*wallet_loops)


for account in accounts:

    wallets = get_wallets()

    loop = asyncio.get_event_loop()

    if loop.is_closed():
        log.info('Create a new loop')
        loop = asyncio.new_event_loop()

    gp = asyncio.wait([main(account, loop, wallets)])
    loop.run_until_complete(gp)
    loop.close()

我得到的错误:

2021-07-02 11:45:54.459777 [info     ] Create a new loop

Exception ignored in: <coroutine object throttle.<locals>.run at 0x7f80ce1492b0>
Traceback (most recent call last):
  File "/home/aaa/env/lib/python3.6/site-packages/name/async_support/base/throttle.py", line 51, in run
    future.set_exception(excp)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 591, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 377, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

我想规范的做法是将您的程序放在异步函数中,然后 运行 将其全部放在事件循环中:



async def watch_book(account, client, market):
    while True:
        ob = await client.watch_order_book(market.symbol)
        if stop():
            break
        await client.sleep(1000)

async def wallet_loop(account, wallet):
    client = getattr(name, exid)({'enableRateLimit': True, 'asyncio_loop': asyncio.get_running_loop()})
    do_more_stuff()
    ws_loops = [watch_book(account, client, market) for market in markets]
    await asyncio.gather(*ws_loops)
    await client.close()

async def main(account, wallets):
    wallet_loops = [wallet_loop(account, wallet) for wallet in wallets]
    await asyncio.gather(*wallet_loops)
    
async def main_orchestrator(accounts):
    for account in accounts:
        wallets = get_wallets()
        gp = await main(account, wallets)

asyncio.run(main_orchestrator(accounts))

而且您甚至可以轻松并行化所有帐户的处理 如果将函数更改为:

async def main(accounts):
    await asyncio.gather(*(main(account, wallets) for account in accounts)) 

如果出于某种原因你真的需要每个批次在一个循环中 运行,尽管如此,只需将 loop = asyncio.get_event_loop() 替换为 loop = asyncio.new_event_loop()