在 python asyncio 上发布订阅
Publish subscribe on python asyncio
我觉得我的问题很简单也很愚蠢,但是我看了很多资料,无法想象如何做我想做的事情。
所以,我使用 websockets
库,我有这个算法:
# 1. get connection and start handle it
async def main_request_handler(ws, path):
proxy = Proxy()
try:
await proxy.start(ws, path)
2。在 start 中,我创建了第二个 websocket 以传递来自 ws
的请求并接收答案以将它们发送到 ws
while True:
request_raw = await self.ws_server.recv()
await self.process_request_from_server(request_raw)
问题是,我需要使用一个 websocket 服务器连接进行乘法
ws
个客户,我需要向每个人传递来自 ws_server
的相同答案。现在我只得到一个响应,因为 .recv() returns 值仅适用于 'subscribers' 之一。
如何解决这个问题?请注意,我使用 while True
和 async
我不确定我是否理解正确,但是 gathering 多个协同程序不是您想要的吗?
while True:
request_raw = await self.ws_server.recv()
# process by multiple clients parallely:
await asyncio.gather(
self.process_by_client_1(request_raw),
self.process_by_client_2(request_raw),
self.process_by_client_3(request_raw),
)
这是一个非常简单的例子 pub/sub websockets 服务器
import asyncio
import websockets
connections = set()
n = 0
async def handler(websocket, path):
global n
if path == "/sub":
n = n + 1
i = n
connections.add(websocket)
print("adding subscriber #", i)
try:
async for msg in websocket:
pass # ignore
except websockets.ConnectionClosed:
pass
finally:
print("removing subscriber #", i)
connections.remove(websocket)
elif path == "/pub":
async for msg in websocket:
print("<", msg)
for ws in connections:
asyncio.ensure_future(ws.send(msg))
start_server = websockets.serve(handler, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
以及订阅者客户端示例(运行 其中一些):
import asyncio
import websockets
async def listen():
async with websockets.connect('ws://localhost:8765/sub') as websocket:
while True:
greeting = await websocket.recv()
print("< {}".format(greeting))
asyncio.get_event_loop().run_until_complete(listen())
和出版商:
import asyncio
import websockets
async def say():
async with websockets.connect('ws://localhost:8765/pub') as websocket:
while True:
msg = input("Enter message:")
if not msg:
break
await websocket.send(msg)
asyncio.get_event_loop().run_until_complete(say())
In other words, I need to run .recv
in the same loop and thread with multiple consumers. In RxPy I could just stream.emit(recv_result)
and consume items like thatstrem.subscribe(callback_fn)
, but this is callback way, I need async
您的 subscribe
方法可以接受协程函数,即使用 async def
创建的函数。一旦发出某些东西,就可以实例化它们,并使用 create_task
:
生成它们的协程
def __init__(self, ...):
self._subscribers = []
def subsribe(self, corofn):
self._subscribers.append(corofn)
def emit(self, obj):
loop = asyncio.get_event_loop()
for corofn in self._subscribers:
coro = corofn(obj)
loop.create_task(coro)
async def main(self):
while True:
request_raw = await self.ws_server.recv()
self.emit(request_raw)
感谢您的建议,它们可能会奏效。我排了队。
class SWebsocket(object):
def __init__(self, websocket: WebSocketServerProtocol):
self.ws = websocket
self.queues = {}
self.subscribe()
def subscribe(self):
# fire and forget function
asyncio.ensure_future(self.recv_mess())
async def recv_mess(self):
while True:
try:
data = await self.ws.recv()
except websockets.ConnectionClosed as e:
for _, q in self.queues.items():
await q.put(e)
return
for _, q in self.queues.items():
await q.put(data)
async def recv(self, id):
# read value from queue
if id not in self.queues:
self.queues[id] = asyncio.Queue()
data = await self.queues[id].get()
if isinstance(data, websockets.ConnectionClosed):
raise data
return data
我觉得我的问题很简单也很愚蠢,但是我看了很多资料,无法想象如何做我想做的事情。
所以,我使用 websockets
库,我有这个算法:
# 1. get connection and start handle it
async def main_request_handler(ws, path):
proxy = Proxy()
try:
await proxy.start(ws, path)
2。在 start 中,我创建了第二个 websocket 以传递来自 ws
的请求并接收答案以将它们发送到 ws
while True:
request_raw = await self.ws_server.recv()
await self.process_request_from_server(request_raw)
问题是,我需要使用一个 websocket 服务器连接进行乘法
ws
个客户,我需要向每个人传递来自 ws_server
的相同答案。现在我只得到一个响应,因为 .recv() returns 值仅适用于 'subscribers' 之一。
如何解决这个问题?请注意,我使用 while True
和 async
我不确定我是否理解正确,但是 gathering 多个协同程序不是您想要的吗?
while True:
request_raw = await self.ws_server.recv()
# process by multiple clients parallely:
await asyncio.gather(
self.process_by_client_1(request_raw),
self.process_by_client_2(request_raw),
self.process_by_client_3(request_raw),
)
这是一个非常简单的例子 pub/sub websockets 服务器
import asyncio
import websockets
connections = set()
n = 0
async def handler(websocket, path):
global n
if path == "/sub":
n = n + 1
i = n
connections.add(websocket)
print("adding subscriber #", i)
try:
async for msg in websocket:
pass # ignore
except websockets.ConnectionClosed:
pass
finally:
print("removing subscriber #", i)
connections.remove(websocket)
elif path == "/pub":
async for msg in websocket:
print("<", msg)
for ws in connections:
asyncio.ensure_future(ws.send(msg))
start_server = websockets.serve(handler, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
以及订阅者客户端示例(运行 其中一些):
import asyncio
import websockets
async def listen():
async with websockets.connect('ws://localhost:8765/sub') as websocket:
while True:
greeting = await websocket.recv()
print("< {}".format(greeting))
asyncio.get_event_loop().run_until_complete(listen())
和出版商:
import asyncio
import websockets
async def say():
async with websockets.connect('ws://localhost:8765/pub') as websocket:
while True:
msg = input("Enter message:")
if not msg:
break
await websocket.send(msg)
asyncio.get_event_loop().run_until_complete(say())
In other words, I need to run
.recv
in the same loop and thread with multiple consumers. In RxPy I could juststream.emit(recv_result)
and consume items likethatstrem.subscribe(callback_fn)
, but this is callback way, I need async
您的 subscribe
方法可以接受协程函数,即使用 async def
创建的函数。一旦发出某些东西,就可以实例化它们,并使用 create_task
:
def __init__(self, ...):
self._subscribers = []
def subsribe(self, corofn):
self._subscribers.append(corofn)
def emit(self, obj):
loop = asyncio.get_event_loop()
for corofn in self._subscribers:
coro = corofn(obj)
loop.create_task(coro)
async def main(self):
while True:
request_raw = await self.ws_server.recv()
self.emit(request_raw)
感谢您的建议,它们可能会奏效。我排了队。
class SWebsocket(object):
def __init__(self, websocket: WebSocketServerProtocol):
self.ws = websocket
self.queues = {}
self.subscribe()
def subscribe(self):
# fire and forget function
asyncio.ensure_future(self.recv_mess())
async def recv_mess(self):
while True:
try:
data = await self.ws.recv()
except websockets.ConnectionClosed as e:
for _, q in self.queues.items():
await q.put(e)
return
for _, q in self.queues.items():
await q.put(data)
async def recv(self, id):
# read value from queue
if id not in self.queues:
self.queues[id] = asyncio.Queue()
data = await self.queues[id].get()
if isinstance(data, websockets.ConnectionClosed):
raise data
return data