while循环和asyncio

While loop and asyncio

美好的一天!

我正在尝试编写 WebSocket 连接器代码并使用 asyncio。我不太熟悉异步方法,因此会发生不正确的行为。下面是代码的简化版本。

import pandas as pd
import json
import websockets
import asyncio
import time

class BinanceQuotesWS:    
    def __init__(self,client,pair):
        self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
        self.pair = pair
        self.socket='wss://fstream.binance.com/ws'
        self.websocket = None
        self.loop = None
        self.result = None
        
    def get_quotes(self):
        return self.quotes
        
    def start(self):
        self.loop = asyncio.get_event_loop()        
        self.result = self.loop.create_task(self.connect())
        
    async def connect(self):
        self.websocket = await websockets.connect(self.socket)
        await self.subscribe_quotes()
        
    async def subscribe_quotes(self):
        subscribe_message = {
        "method": "SUBSCRIBE",
        "params":
        [
         self.pair.lower()+"@trade"
         ],
         "id": 1
         }
        subscribe_message = json.dumps(subscribe_message)
        await self.websocket.send(subscribe_message)
        async for msg in self.websocket:
            msg = json.loads(msg)    
            if('p' in msg):
                self.quotes.loc[0] = [msg['E'],float(msg['p'])]

temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()

当我在 Jupyter 中测试它并使用 temp_ws.get_quotes() 手动执行一个单元格时,每次都会返回带有新引号的正确数据帧。

虽然在我的程序中我需要一些无限循环并且出现错误。

while(True):
    quotes = temp_ws.get_quotes()
    print(quotes)
    time.sleep(3)

quotes DF 总是空的,但我无法弄清楚为什么(可能是因为 while 循环阻塞了)。如果有人可以帮助解决这个问题,我会很高兴(如果在异步请求方面可以改进代码中的任何其他内容,我会给出一些提示)。谢谢。

您可以使用 asyncio.sleep 创建 async 函数

async def display(self):
    while True:
        await asyncio.sleep(3)
        quotes = self.get_quotes()
        print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])

并将其添加到循环

self.result2 = self.loop.create_task(self.display())

然后你可以运行全部在同一个循环中

temp_ws.loop.run_forever()

如果你不使用 run_forever() 那么它就不会 运行 connect() - 你不会在你的标准循环中获得值。但是这个循环必须一直 运行s 并且它不能与正常循环同时 运行s (它也必须一直 运行 )。循环之一必须 运行 在单独的线程中。

但是 await(whit asyncio.sleep)解决了问题。当它在 while True 中休眠时,它会转到其他函数并且它可以 运行 其他代码 - 后来一些其他代码使用 await 然后它可以返回 while True


也许在 Jupyter 中它可以与 run_forever() 一起使用,因为它们添加了许多额外的功能以使生活更轻松(并且 Jupyter 中使用的元素可能需要此循环才能正常工作)但是在正常程序中,您必须手动使用 run_forever()


最小工作代码:

import pandas as pd
import json
import websockets
import asyncio
import time

class BinanceQuotesWS:

    def __init__(self,client,pair):
        self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
        self.pair = pair
        self.socket='wss://fstream.binance.com/ws'
        self.websocket = None
        self.loop = None
        self.result = None

    def get_quotes(self):
        return self.quotes

    def start(self):
        self.loop = asyncio.get_event_loop()
        self.result = self.loop.create_task(self.connect())
        self.result2 = self.loop.create_task(self.display())

    async def connect(self):
        self.websocket = await websockets.connect(self.socket)
        await self.subscribe_quotes()

    async def subscribe_quotes(self):
        subscribe_message = {
            "method": "SUBSCRIBE",
            "params": [
                 self.pair.lower()+"@trade"
             ],
            "id": 1
        }
         
        subscribe_message = json.dumps(subscribe_message)
        await self.websocket.send(subscribe_message)
        async for msg in self.websocket:
            msg = json.loads(msg)
            if('p' in msg):
                self.quotes.loc[0] = [msg['E'],float(msg['p'])]
                #print(self.quotes)

    async def display(self):
        while True:
            await asyncio.sleep(3)
            quotes = self.get_quotes()
            print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])

client = ''
temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()
temp_ws.loop.run_forever()