如何用aioredis实现单生产者多消费者pub/sub
How to implement single-producer multi-consumer with aioredis pub/sub
我有网络应用程序。该应用程序具有将一些对象数据推送到 redis
通道的端点。
另一个端点处理 websocket
连接,其中从通道获取数据并通过 ws
发送到客户端。
当我通过 ws 连接时,消息仅获取第一个连接的客户端。
如何使用多个客户端从 redis
频道读取消息而不创建新订阅?
Websocket 处理程序。
我在这里订阅频道,将其保存到应用程序 (init_tram_channel
)。然后 运行 我收听频道和发送消息的工作 (run_tram_listening
)。
@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
tram_id = int(request.match_info['tram_id'])
channel_name = f'tram_{tram_id}'
await init_tram_channel(channel_name, request.app)
tram_job = await run_tram_listening(
request=request,
ws=ws,
channel=request.app['tram_producers'][channel_name]
)
request.app['websockets'].add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
break
if msg.type == aiohttp.WSMsgType.ERROR:
logging.error(f'ws connection was closed with exception {ws.exception()}')
else:
await asyncio.sleep(0.005)
except asyncio.CancelledError:
pass
finally:
await tram_job.close()
request.app['websockets'].discard(ws)
return ws
正在订阅并保存频道。
每个通道都与唯一对象相关,为了不创建与同一对象相关的多个通道,我只将一个通道保存到应用程序。
app['tram_producers']
是字典。
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = channel
运行 用于收听频道的 coro。
我 运行 通过 aiojobs:
async def run_tram_listening(
request: web.Request,
ws: web.WebSocketResponse,
channel: Channel
):
"""
:return: aiojobs._job.Job object
"""
listen_redis_job = await spawn(
request,
_read_tram_subscription(
ws,
channel
)
)
return listen_redis_job
Coro 我在哪里收听和发送消息:
async def _read_tram_subscription(
ws: web.WebSocketResponse,
channel: Channel
):
try:
async for msg in channel.iter():
tram_data = msg.decode()
await ws.send_json(tram_data)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)
我猜一条消息只会从一个 Redis 订阅中收到一次,如果您的应用程序中有多个侦听器,那么只有其中一个会收到消息。
因此您需要在应用程序内部创建类似于 mini pub/sub 的东西,以将消息分发给所有侦听器(在本例中为 websocket 连接)。
前段时间我做了一个 aiohttp websocket 聊天示例 - 不是用 Redis,但至少有跨 websocket 分布:https://github.com/messa/aiohttp-nextjs-demo-chat/blob/master/chat_web/views/api.py
关键是要有一个应用程序范围的 message_subcriptions
,其中每个 websocket 连接都注册自己,或者可能是它自己的 asyncio.Queue(我在示例中使用了 Event,但这是次优的) ,每当消息来自 Redis 时,它都会被推送到所有相关队列。
当然,当 websocket 连接结束时(客户端取消订阅、断开连接、失败...),队列应该被删除(如果它是最后一个监听它的连接,则可能取消 Redis 订阅)。
Asyncio 并不意味着我们应该忘记队列 :) 熟悉一次组合多个任务也很好(从 websocket 读取,从消息队列读取,也许从某个通知队列读取...)。使用队列还可以帮助您更干净地处理客户端重新连接(不会丢失任何消息)。
以下代码已在某些 aioredis github 问题中找到(我已将其用于我的任务)。
class TramProducer:
def __init__(self, channel: aioredis.Channel):
self._future = None
self._channel = channel
def __aiter__(self):
return self
def __anext__(self):
return asyncio.shield(self._get_message())
async def _get_message(self):
if self._future:
return await self._future
self._future = asyncio.get_event_loop().create_future()
message = await self._channel.get_json()
future, self._future = self._future, None
future.set_result(message)
return message
那么,它是如何运作的? TramProducer 包装了我们获取消息的方式。
正如@Messa所说
message is received from one Redis subscription only once.
所以只有 TramProducer 的一个客户端正在从 redis 中检索消息,而其他客户端正在等待从通道接收消息后设置的未来结果。
如果self._future
初始化了,说明有人在等待redis的消息,所以我们就等self._future
结果。
TramProducer 用法(我从我的问题中举了一个例子):
async def _read_tram_subscription(
ws: web.WebSocketResponse,
tram_producer: TramProducer
):
try:
async for msg in tram_producer:
await ws.send_json(msg)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)
TramProducer 初始化:
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = TramProducer(channel)
我认为这可能对某些人有帮助。
完整项目在这里 https://gitlab.com/tram-emulator/tram-server
我有网络应用程序。该应用程序具有将一些对象数据推送到 redis
通道的端点。
另一个端点处理 websocket
连接,其中从通道获取数据并通过 ws
发送到客户端。
当我通过 ws 连接时,消息仅获取第一个连接的客户端。
如何使用多个客户端从 redis
频道读取消息而不创建新订阅?
Websocket 处理程序。
我在这里订阅频道,将其保存到应用程序 (init_tram_channel
)。然后 运行 我收听频道和发送消息的工作 (run_tram_listening
)。
@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
tram_id = int(request.match_info['tram_id'])
channel_name = f'tram_{tram_id}'
await init_tram_channel(channel_name, request.app)
tram_job = await run_tram_listening(
request=request,
ws=ws,
channel=request.app['tram_producers'][channel_name]
)
request.app['websockets'].add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
break
if msg.type == aiohttp.WSMsgType.ERROR:
logging.error(f'ws connection was closed with exception {ws.exception()}')
else:
await asyncio.sleep(0.005)
except asyncio.CancelledError:
pass
finally:
await tram_job.close()
request.app['websockets'].discard(ws)
return ws
正在订阅并保存频道。
每个通道都与唯一对象相关,为了不创建与同一对象相关的多个通道,我只将一个通道保存到应用程序。
app['tram_producers']
是字典。
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = channel
运行 用于收听频道的 coro。 我 运行 通过 aiojobs:
async def run_tram_listening(
request: web.Request,
ws: web.WebSocketResponse,
channel: Channel
):
"""
:return: aiojobs._job.Job object
"""
listen_redis_job = await spawn(
request,
_read_tram_subscription(
ws,
channel
)
)
return listen_redis_job
Coro 我在哪里收听和发送消息:
async def _read_tram_subscription(
ws: web.WebSocketResponse,
channel: Channel
):
try:
async for msg in channel.iter():
tram_data = msg.decode()
await ws.send_json(tram_data)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)
我猜一条消息只会从一个 Redis 订阅中收到一次,如果您的应用程序中有多个侦听器,那么只有其中一个会收到消息。
因此您需要在应用程序内部创建类似于 mini pub/sub 的东西,以将消息分发给所有侦听器(在本例中为 websocket 连接)。
前段时间我做了一个 aiohttp websocket 聊天示例 - 不是用 Redis,但至少有跨 websocket 分布:https://github.com/messa/aiohttp-nextjs-demo-chat/blob/master/chat_web/views/api.py
关键是要有一个应用程序范围的 message_subcriptions
,其中每个 websocket 连接都注册自己,或者可能是它自己的 asyncio.Queue(我在示例中使用了 Event,但这是次优的) ,每当消息来自 Redis 时,它都会被推送到所有相关队列。
当然,当 websocket 连接结束时(客户端取消订阅、断开连接、失败...),队列应该被删除(如果它是最后一个监听它的连接,则可能取消 Redis 订阅)。
Asyncio 并不意味着我们应该忘记队列 :) 熟悉一次组合多个任务也很好(从 websocket 读取,从消息队列读取,也许从某个通知队列读取...)。使用队列还可以帮助您更干净地处理客户端重新连接(不会丢失任何消息)。
以下代码已在某些 aioredis github 问题中找到(我已将其用于我的任务)。
class TramProducer:
def __init__(self, channel: aioredis.Channel):
self._future = None
self._channel = channel
def __aiter__(self):
return self
def __anext__(self):
return asyncio.shield(self._get_message())
async def _get_message(self):
if self._future:
return await self._future
self._future = asyncio.get_event_loop().create_future()
message = await self._channel.get_json()
future, self._future = self._future, None
future.set_result(message)
return message
那么,它是如何运作的? TramProducer 包装了我们获取消息的方式。
正如@Messa所说
message is received from one Redis subscription only once.
所以只有 TramProducer 的一个客户端正在从 redis 中检索消息,而其他客户端正在等待从通道接收消息后设置的未来结果。
如果self._future
初始化了,说明有人在等待redis的消息,所以我们就等self._future
结果。
TramProducer 用法(我从我的问题中举了一个例子):
async def _read_tram_subscription(
ws: web.WebSocketResponse,
tram_producer: TramProducer
):
try:
async for msg in tram_producer:
await ws.send_json(msg)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)
TramProducer 初始化:
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = TramProducer(channel)
我认为这可能对某些人有帮助。
完整项目在这里 https://gitlab.com/tram-emulator/tram-server