OOP Python 网络套接字
OOP Python websockets
我想将 python websockets 包的功能封装到 class 中,代表一个传感器协调器。这样做的目的是允许我创建一个协调器对象,并且只让服务器在需要时持续存在。不幸的是,我无法在网上找到任何类似的例子,并且到目前为止一直在努力。
我的代码如下:
import asyncio
import json
import logging
import websockets
logging.basicConfig()
class Coordinator(object):
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.running = False
self.server = None
self.sensors = set()
def __enter__(self):
self.server = websockets.serve((self.ws_handler, self.host, self.port))
self.running = True
def __exit__(self, exc_type, exc_val, exc_tb):
# Gracefully stop serving
self.running = False
pass
def sensors_event(self):
return json.dumps({'type': 'sensors', 'count': len(self.sensors)})
async def notify_sensors(self):
if self.sensors:
message = self.sensors_event()
await asyncio.wait([user.send(message) for user in self.sensors])
async def register(self, websocket):
self.sensors.add(websocket)
await self.notify_sensors()
async def unregister(self, websocket):
self.sensors.remove(websocket)
await self.notify_sensors()
async def ws_handler(self, websocket):
try:
await self.register(websocket)
pass
finally:
await self.unregister(websocket)
if __name__ == '__main__':
with Coordinator() as coordinator:
pass
目前看来 websocket 服务器没有启动,因为它在 netstat 上不可见。
是否可以 运行 服务器在一个单独的(恶魔化的)线程中,由协调器对象持有?
谢谢
您的代码有两个问题。
你永远不会启动 asyncio 主循环,所以 asyncio 永远没有机会 运行。换句话说,您需要在代码中的某处包含 loop.run_until_complete(x)
。
start_server
是一个协程,所以你必须等待它。
代码的固定版本(但未经测试)可能如下所示:
class Coordinator(object):
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.running = False
self.server = None
self.sensors = set()
async def __aenter__(self):
self.server = await websockets.serve((self.ws_handler, self.host, self.port))
self.running = True
def __aexit__(self, exc_type, exc_val, exc_tb):
# Gracefully stop serving
self.running = False
def sensors_event(self):
return json.dumps({'type': 'sensors', 'count': len(self.sensors)})
async def notify_sensors(self):
if self.sensors:
message = self.sensors_event()
await asyncio.wait([user.send(message) for user in self.sensors])
async def register(self, websocket):
self.sensors.add(websocket)
await self.notify_sensors()
async def unregister(self, websocket):
self.sensors.remove(websocket)
await self.notify_sensors()
async def ws_handler(self, websocket):
try:
await self.register(websocket)
finally:
await self.unregister(websocket)
async def main():
async with Coordinator() as coordinator:
pass
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
如果您花时间了解 a tutorial 涵盖基本异步概念的内容,例如 运行 主循环,使用 asyncio 会容易得多。
The websockets.server
module defines a simple WebSocket
server API.
serve()
returns an awaitable. Awaiting it yields an instance of
WebSocketServer
which provides close()
and wait_closed()
methods
for terminating the server and cleaning up its resources.
On Python ≥ 3.5, serve()
can also be used as an asynchronous context manager.
In this case, the server is shut down when exiting the context.
正如@user4815162342 已经确定的那样,主要问题是您不等待对 serve()
协程的调用。
由于您使用的是 Python v3.6.8,因此您可以使用异步上下文管理器来简化实施。这样做的好处是您无需担心处理关机,因为它是自动处理的。这是一个简单的回显服务器的面向对象的实现。
import asyncio
import signal
import websockets
class Server(object):
def __init__(self, host, port):
self.host, self.port = host, port
self.loop = asyncio.get_event_loop()
self.stop = self.loop.create_future()
self.loop.add_signal_handler(signal.SIGINT, self.stop.set_result, None)
self.loop.run_until_complete(self.server())
async def server(self):
async with websockets.serve(self.ws_handler, self.host, self.port):
await self.stop
async def ws_handler(self, websocket, path):
msg = await websocket.recv()
print(f'Received: {msg}')
await websocket.send(msg)
print(f'Sending: {msg}')
if __name__ == '__main__':
server = Server(host='localhost', port=6789)
目前,这将 运行 直到用户发送中断,但您可以调整 stop
未来以适应。
我想将 python websockets 包的功能封装到 class 中,代表一个传感器协调器。这样做的目的是允许我创建一个协调器对象,并且只让服务器在需要时持续存在。不幸的是,我无法在网上找到任何类似的例子,并且到目前为止一直在努力。
我的代码如下:
import asyncio
import json
import logging
import websockets
logging.basicConfig()
class Coordinator(object):
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.running = False
self.server = None
self.sensors = set()
def __enter__(self):
self.server = websockets.serve((self.ws_handler, self.host, self.port))
self.running = True
def __exit__(self, exc_type, exc_val, exc_tb):
# Gracefully stop serving
self.running = False
pass
def sensors_event(self):
return json.dumps({'type': 'sensors', 'count': len(self.sensors)})
async def notify_sensors(self):
if self.sensors:
message = self.sensors_event()
await asyncio.wait([user.send(message) for user in self.sensors])
async def register(self, websocket):
self.sensors.add(websocket)
await self.notify_sensors()
async def unregister(self, websocket):
self.sensors.remove(websocket)
await self.notify_sensors()
async def ws_handler(self, websocket):
try:
await self.register(websocket)
pass
finally:
await self.unregister(websocket)
if __name__ == '__main__':
with Coordinator() as coordinator:
pass
目前看来 websocket 服务器没有启动,因为它在 netstat 上不可见。
是否可以 运行 服务器在一个单独的(恶魔化的)线程中,由协调器对象持有?
谢谢
您的代码有两个问题。
你永远不会启动 asyncio 主循环,所以 asyncio 永远没有机会 运行。换句话说,您需要在代码中的某处包含
loop.run_until_complete(x)
。start_server
是一个协程,所以你必须等待它。
代码的固定版本(但未经测试)可能如下所示:
class Coordinator(object):
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.running = False
self.server = None
self.sensors = set()
async def __aenter__(self):
self.server = await websockets.serve((self.ws_handler, self.host, self.port))
self.running = True
def __aexit__(self, exc_type, exc_val, exc_tb):
# Gracefully stop serving
self.running = False
def sensors_event(self):
return json.dumps({'type': 'sensors', 'count': len(self.sensors)})
async def notify_sensors(self):
if self.sensors:
message = self.sensors_event()
await asyncio.wait([user.send(message) for user in self.sensors])
async def register(self, websocket):
self.sensors.add(websocket)
await self.notify_sensors()
async def unregister(self, websocket):
self.sensors.remove(websocket)
await self.notify_sensors()
async def ws_handler(self, websocket):
try:
await self.register(websocket)
finally:
await self.unregister(websocket)
async def main():
async with Coordinator() as coordinator:
pass
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
如果您花时间了解 a tutorial 涵盖基本异步概念的内容,例如 运行 主循环,使用 asyncio 会容易得多。
The
websockets.server
module defines a simpleWebSocket
server API.
serve()
returns an awaitable. Awaiting it yields an instance ofWebSocketServer
which providesclose()
andwait_closed()
methods for terminating the server and cleaning up its resources.On Python ≥ 3.5,
serve()
can also be used as an asynchronous context manager. In this case, the server is shut down when exiting the context.
正如@user4815162342 已经确定的那样,主要问题是您不等待对 serve()
协程的调用。
由于您使用的是 Python v3.6.8,因此您可以使用异步上下文管理器来简化实施。这样做的好处是您无需担心处理关机,因为它是自动处理的。这是一个简单的回显服务器的面向对象的实现。
import asyncio
import signal
import websockets
class Server(object):
def __init__(self, host, port):
self.host, self.port = host, port
self.loop = asyncio.get_event_loop()
self.stop = self.loop.create_future()
self.loop.add_signal_handler(signal.SIGINT, self.stop.set_result, None)
self.loop.run_until_complete(self.server())
async def server(self):
async with websockets.serve(self.ws_handler, self.host, self.port):
await self.stop
async def ws_handler(self, websocket, path):
msg = await websocket.recv()
print(f'Received: {msg}')
await websocket.send(msg)
print(f'Sending: {msg}')
if __name__ == '__main__':
server = Server(host='localhost', port=6789)
目前,这将 运行 直到用户发送中断,但您可以调整 stop
未来以适应。