Asyncio 脚本执行缓慢,类似于同步脚本

Asyncio script performs slowly, similar to sync script

我正在编写一个 asyncio 脚本,通过 ib_insync 库从 Interactive Brokers 检索股票数据。

虽然脚本在运行,但性能与串行脚本类似。我希望看到速度的显着提高。此代码将用于生产。

我是 asyncio 的新手,感觉缺少一个重要元素。下面是完整的脚本。非常感谢帮助加快速度。谢谢

import asyncio
import ib_insync as ibi
import nest_asyncio
import pandas as pd

nest_asyncio.apply()

class App:
    async def run(self, symbols):
        print(f"1 start run: {symbols}")
        self.ib = ibi.IB()
        with await self.ib.connectAsync("127.0.0.1", "****", clientId="****"):
            contracts = [ibi.Stock(symbol, "SMART", "USD") for symbol in symbols]
            bars_dict = dict()
            print(f"2 start loop: {symbols}")
            for contract in contracts:
                bars = await self.ib.reqHistoricalDataAsync(
                    contract,
                    endDateTime="",
                    durationStr="1 M",
                    barSizeSetting="1 day",
                    whatToShow="ADJUSTED_LAST",
                    useRTH=True,
                )
                # Convert to dataframes.
                bars_dict[contract.symbol] = ibi.util.df(bars)
            print(f"3 End bars: {symbols}")
            return bars_dict

    async def main(self):
        res = await asyncio.gather(self.run(self.sp500(0, 100)))
        return res

    def stop(self):
        self.ib.disconnect()

    def sp500(self, start=None, end=10):
        payload = pd.read_html(
            "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
        )
        first_table = payload[0]
        sp500 = first_table["Symbol"].sort_values().to_list()
        return sp500[start:end]


if __name__ == "__main__":
    import time

    start = time.time()

    app = App()
    try:
        print(f"START CALL")
        res = asyncio.run(app.main())
        print(f"END CALL")
    except (KeyboardInterrupt, SystemExit):
        app.stop()
    for ticker, bars in res[0].items():
        print(f"{ticker}\n{bars}")

    print(f"Total time: {(time.time() - start)}")

您的脚本正在按顺序 运行ning。 main 中对 asyncio.gather() 的调用是无用的,因为它只用一个协程调用。您应该使用多个协同程序调用它,以使它们 运行 并行。

例如,您可以从 main 中删除 asyncio.gather()(那里只有 await self.run(self.sp500(0, 100)),而是使用它来并行化对 reqHistoricalDataAsync:[=17= 的调用]

class App:
    async def run(self, symbols):
        print(f"1 start run: {symbols}")
        self.ib = ibi.IB()
        with await self.ib.connectAsync("127.0.0.1", "****", clientId="****"):
            contracts = [ibi.Stock(symbol, "SMART", "USD") for symbol in symbols]
            print(f"2 start loop: {symbols}")
            all_bars = await asyncio.gather(*[
                self.ib.reqHistoricalDataAsync(
                    contract,
                    endDateTime="",
                    durationStr="1 M",
                    barSizeSetting="1 day",
                    whatToShow="ADJUSTED_LAST",
                    useRTH=True,
                )
                for contract in contracts
            ])
            bars_dict = {}
            for contract, bars in zip(contracts, all_bars):
                # Convert to dataframes.
                bars_dict[contract.symbol] = ibi.util.df(bars)
            print(f"3 End bars: {symbols}")
            return bars_dict