while循环和asyncio
While loop and asyncio
美好的一天!
我正在尝试编写 WebSocket 连接器代码并使用 asyncio。我不太熟悉异步方法,因此会发生不正确的行为。下面是代码的简化版本。
import pandas as pd
import json
import websockets
import asyncio
import time
class BinanceQuotesWS:
def __init__(self,client,pair):
self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
self.pair = pair
self.socket='wss://fstream.binance.com/ws'
self.websocket = None
self.loop = None
self.result = None
def get_quotes(self):
return self.quotes
def start(self):
self.loop = asyncio.get_event_loop()
self.result = self.loop.create_task(self.connect())
async def connect(self):
self.websocket = await websockets.connect(self.socket)
await self.subscribe_quotes()
async def subscribe_quotes(self):
subscribe_message = {
"method": "SUBSCRIBE",
"params":
[
self.pair.lower()+"@trade"
],
"id": 1
}
subscribe_message = json.dumps(subscribe_message)
await self.websocket.send(subscribe_message)
async for msg in self.websocket:
msg = json.loads(msg)
if('p' in msg):
self.quotes.loc[0] = [msg['E'],float(msg['p'])]
temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()
当我在 Jupyter 中测试它并使用 temp_ws.get_quotes()
手动执行一个单元格时,每次都会返回带有新引号的正确数据帧。
虽然在我的程序中我需要一些无限循环并且出现错误。
while(True):
quotes = temp_ws.get_quotes()
print(quotes)
time.sleep(3)
quotes
DF 总是空的,但我无法弄清楚为什么(可能是因为 while 循环阻塞了)。如果有人可以帮助解决这个问题,我会很高兴(如果在异步请求方面可以改进代码中的任何其他内容,我会给出一些提示)。谢谢。
您可以使用 asyncio.sleep
创建 async
函数
async def display(self):
while True:
await asyncio.sleep(3)
quotes = self.get_quotes()
print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])
并将其添加到循环
self.result2 = self.loop.create_task(self.display())
然后你可以运行全部在同一个循环中
temp_ws.loop.run_forever()
如果你不使用 run_forever()
那么它就不会 运行 connect()
- 你不会在你的标准循环中获得值。但是这个循环必须一直 运行s 并且它不能与正常循环同时 运行s (它也必须一直 运行 )。循环之一必须 运行 在单独的线程中。
但是 await
(whit asyncio.sleep
)解决了问题。当它在 while True
中休眠时,它会转到其他函数并且它可以 运行 其他代码 - 后来一些其他代码使用 await
然后它可以返回 while True
。
也许在 Jupyter
中它可以与 run_forever()
一起使用,因为它们添加了许多额外的功能以使生活更轻松(并且 Jupyter
中使用的元素可能需要此循环才能正常工作)但是在正常程序中,您必须手动使用 run_forever()
。
最小工作代码:
import pandas as pd
import json
import websockets
import asyncio
import time
class BinanceQuotesWS:
def __init__(self,client,pair):
self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
self.pair = pair
self.socket='wss://fstream.binance.com/ws'
self.websocket = None
self.loop = None
self.result = None
def get_quotes(self):
return self.quotes
def start(self):
self.loop = asyncio.get_event_loop()
self.result = self.loop.create_task(self.connect())
self.result2 = self.loop.create_task(self.display())
async def connect(self):
self.websocket = await websockets.connect(self.socket)
await self.subscribe_quotes()
async def subscribe_quotes(self):
subscribe_message = {
"method": "SUBSCRIBE",
"params": [
self.pair.lower()+"@trade"
],
"id": 1
}
subscribe_message = json.dumps(subscribe_message)
await self.websocket.send(subscribe_message)
async for msg in self.websocket:
msg = json.loads(msg)
if('p' in msg):
self.quotes.loc[0] = [msg['E'],float(msg['p'])]
#print(self.quotes)
async def display(self):
while True:
await asyncio.sleep(3)
quotes = self.get_quotes()
print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])
client = ''
temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()
temp_ws.loop.run_forever()
美好的一天!
我正在尝试编写 WebSocket 连接器代码并使用 asyncio。我不太熟悉异步方法,因此会发生不正确的行为。下面是代码的简化版本。
import pandas as pd
import json
import websockets
import asyncio
import time
class BinanceQuotesWS:
def __init__(self,client,pair):
self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
self.pair = pair
self.socket='wss://fstream.binance.com/ws'
self.websocket = None
self.loop = None
self.result = None
def get_quotes(self):
return self.quotes
def start(self):
self.loop = asyncio.get_event_loop()
self.result = self.loop.create_task(self.connect())
async def connect(self):
self.websocket = await websockets.connect(self.socket)
await self.subscribe_quotes()
async def subscribe_quotes(self):
subscribe_message = {
"method": "SUBSCRIBE",
"params":
[
self.pair.lower()+"@trade"
],
"id": 1
}
subscribe_message = json.dumps(subscribe_message)
await self.websocket.send(subscribe_message)
async for msg in self.websocket:
msg = json.loads(msg)
if('p' in msg):
self.quotes.loc[0] = [msg['E'],float(msg['p'])]
temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()
当我在 Jupyter 中测试它并使用 temp_ws.get_quotes()
手动执行一个单元格时,每次都会返回带有新引号的正确数据帧。
虽然在我的程序中我需要一些无限循环并且出现错误。
while(True):
quotes = temp_ws.get_quotes()
print(quotes)
time.sleep(3)
quotes
DF 总是空的,但我无法弄清楚为什么(可能是因为 while 循环阻塞了)。如果有人可以帮助解决这个问题,我会很高兴(如果在异步请求方面可以改进代码中的任何其他内容,我会给出一些提示)。谢谢。
您可以使用 asyncio.sleep
创建 async
函数
async def display(self):
while True:
await asyncio.sleep(3)
quotes = self.get_quotes()
print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])
并将其添加到循环
self.result2 = self.loop.create_task(self.display())
然后你可以运行全部在同一个循环中
temp_ws.loop.run_forever()
如果你不使用 run_forever()
那么它就不会 运行 connect()
- 你不会在你的标准循环中获得值。但是这个循环必须一直 运行s 并且它不能与正常循环同时 运行s (它也必须一直 运行 )。循环之一必须 运行 在单独的线程中。
但是 await
(whit asyncio.sleep
)解决了问题。当它在 while True
中休眠时,它会转到其他函数并且它可以 运行 其他代码 - 后来一些其他代码使用 await
然后它可以返回 while True
。
也许在 Jupyter
中它可以与 run_forever()
一起使用,因为它们添加了许多额外的功能以使生活更轻松(并且 Jupyter
中使用的元素可能需要此循环才能正常工作)但是在正常程序中,您必须手动使用 run_forever()
。
最小工作代码:
import pandas as pd
import json
import websockets
import asyncio
import time
class BinanceQuotesWS:
def __init__(self,client,pair):
self.quotes = pd.DataFrame(columns=['Timestamp','Price'])
self.pair = pair
self.socket='wss://fstream.binance.com/ws'
self.websocket = None
self.loop = None
self.result = None
def get_quotes(self):
return self.quotes
def start(self):
self.loop = asyncio.get_event_loop()
self.result = self.loop.create_task(self.connect())
self.result2 = self.loop.create_task(self.display())
async def connect(self):
self.websocket = await websockets.connect(self.socket)
await self.subscribe_quotes()
async def subscribe_quotes(self):
subscribe_message = {
"method": "SUBSCRIBE",
"params": [
self.pair.lower()+"@trade"
],
"id": 1
}
subscribe_message = json.dumps(subscribe_message)
await self.websocket.send(subscribe_message)
async for msg in self.websocket:
msg = json.loads(msg)
if('p' in msg):
self.quotes.loc[0] = [msg['E'],float(msg['p'])]
#print(self.quotes)
async def display(self):
while True:
await asyncio.sleep(3)
quotes = self.get_quotes()
print('time:', quotes['Timestamp'][0], 'price:', quotes['Price'][0])
client = ''
temp_ws = BinanceQuotesWS(client,'BTCUSDT')
temp_ws.start()
temp_ws.loop.run_forever()