Python socket.io 在本地发出一个事件
Python socket.io locally emit an event
我想发出一个本地事件而不是使用来自服务器的事件。基本上,本地调用事件。原因是要处理数据,然后可以由另一个函数使用。
相关问题:
Can the socket.io client emit events locally? - 这只支持 node.js 而没有 python 实现。
代码:
await self.sio.connect(SOCKET_IP, namespaces=['/'], transports=['websocket'])
await sio.emit("connection")
await sio.emit('authentication', {'token': new_token})
@sio.event
async def auth_err(data):
authErr = True
await sio.disconnect()
@sio.event
async def alive_status(data):
await sio.emit("alive_status_return", "alive")
@sio.on('connected')
async def on_connect(data):
print("Connected")
我确定 sio.emit()
会广播到我不想要的服务器。
考虑做 sio.call()
但最终 await sio.call("on_new_data")
给出 Sending packet MESSAGE data 21["on_new_data"]
那么函数没有获取到数据
@sio.event
async def on_new_data():
print("2")
我最终不得不在 python socket.io 的 asyncio_client.py
中为此
编写一个自定义函数
async def _call_event(self, event, namespace, data=None):
namespace = namespace or '/'
self.logger.info('Received custom event "%s" [%s]', event, namespace)
try:
if data:
r = await self._trigger_event(event, namespace, data)
else:
r = await self._trigger_event(event, namespace)
except Exception as e:
print(traceback.format_exc())
我想发出一个本地事件而不是使用来自服务器的事件。基本上,本地调用事件。原因是要处理数据,然后可以由另一个函数使用。
相关问题:
Can the socket.io client emit events locally? - 这只支持 node.js 而没有 python 实现。
代码:
await self.sio.connect(SOCKET_IP, namespaces=['/'], transports=['websocket'])
await sio.emit("connection")
await sio.emit('authentication', {'token': new_token})
@sio.event
async def auth_err(data):
authErr = True
await sio.disconnect()
@sio.event
async def alive_status(data):
await sio.emit("alive_status_return", "alive")
@sio.on('connected')
async def on_connect(data):
print("Connected")
我确定 sio.emit()
会广播到我不想要的服务器。
考虑做 sio.call()
但最终 await sio.call("on_new_data")
给出 Sending packet MESSAGE data 21["on_new_data"]
那么函数没有获取到数据
@sio.event
async def on_new_data():
print("2")
我最终不得不在 python socket.io 的 asyncio_client.py
中为此
async def _call_event(self, event, namespace, data=None):
namespace = namespace or '/'
self.logger.info('Received custom event "%s" [%s]', event, namespace)
try:
if data:
r = await self._trigger_event(event, namespace, data)
else:
r = await self._trigger_event(event, namespace)
except Exception as e:
print(traceback.format_exc())