在 python asyncio 上发布订阅

Publish subscribe on python asyncio

我觉得我的问题很简单也很愚蠢,但是我看了很多资料,无法想象如何做我想做的事情。

所以,我使用 websockets 库,我有这个算法:

# 1. get connection and start handle it
async def main_request_handler(ws, path):
    proxy = Proxy()
    try:
        await proxy.start(ws, path)

2。在 start 中,我创建了第二个 websocket 以传递来自 ws 的请求并接收答案以将它们发送到 ws

while True:
    request_raw = await self.ws_server.recv()
    await self.process_request_from_server(request_raw)

问题是,我需要使用一个 websocket 服务器连接进行乘法 ws 个客户,我需要向每个人传递来自 ws_server 的相同答案。现在我只得到一个响应,因为 .recv() returns 值仅适用于 'subscribers' 之一。 如何解决这个问题?请注意,我使用 while Trueasync

我不确定我是否理解正确,但是 gathering 多个协同程序不是您想要的吗?

while True:
    request_raw = await self.ws_server.recv()

    # process by multiple clients parallely:
    await asyncio.gather(
        self.process_by_client_1(request_raw),
        self.process_by_client_2(request_raw),
        self.process_by_client_3(request_raw),
    )

这是一个非常简单的例子 pub/sub websockets 服务器

import asyncio
import websockets

connections = set()
n = 0


async def handler(websocket, path):
    global n

    if path == "/sub":
        n = n + 1
        i = n
        connections.add(websocket)
        print("adding subscriber #", i)
        try:
            async for msg in websocket:
                pass  # ignore
        except websockets.ConnectionClosed:
            pass
        finally:
            print("removing subscriber #", i)
            connections.remove(websocket)

    elif path == "/pub":
        async for msg in websocket:
            print("<", msg)
            for ws in connections:
                asyncio.ensure_future(ws.send(msg))


start_server = websockets.serve(handler, 'localhost', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

以及订阅者客户端示例(运行 其中一些):

import asyncio
import websockets


async def listen():
    async with websockets.connect('ws://localhost:8765/sub') as websocket:
        while True:
            greeting = await websocket.recv()
            print("< {}".format(greeting))


asyncio.get_event_loop().run_until_complete(listen())

和出版商:

import asyncio
import websockets


async def say():
    async with websockets.connect('ws://localhost:8765/pub') as websocket:
        while True:
            msg = input("Enter message:")
            if not msg:
                break
            await websocket.send(msg)


asyncio.get_event_loop().run_until_complete(say())

In other words, I need to run .recv in the same loop and thread with multiple consumers. In RxPy I could just stream.emit(recv_result) and consume items like thatstrem.subscribe(callback_fn), but this is callback way, I need async

您的 subscribe 方法可以接受协程函数,即使用 async def 创建的函数。一旦发出某些东西,就可以实例化它们,并使用 create_task:

生成它们的协程
def __init__(self, ...):
    self._subscribers = []

def subsribe(self, corofn):
    self._subscribers.append(corofn)

def emit(self, obj):
    loop = asyncio.get_event_loop()
    for corofn in self._subscribers:
        coro = corofn(obj)
        loop.create_task(coro)

async def main(self):
    while True:
        request_raw = await self.ws_server.recv()
        self.emit(request_raw)

感谢您的建议,它们可能会奏效。我排了队。

class SWebsocket(object):

    def __init__(self, websocket: WebSocketServerProtocol):
        self.ws = websocket
        self.queues = {}
        self.subscribe()

    def subscribe(self):
        # fire and forget function
        asyncio.ensure_future(self.recv_mess())

    async def recv_mess(self):
        while True:
            try:
                data = await self.ws.recv()
            except websockets.ConnectionClosed as e:
                for _, q in self.queues.items():
                    await q.put(e)
                return
            for _, q in self.queues.items():
                await q.put(data)

    async def recv(self, id):
        # read value from queue
        if id not in self.queues:
            self.queues[id] = asyncio.Queue()
        data = await self.queues[id].get()
        if isinstance(data, websockets.ConnectionClosed):
            raise data
        return data