django 通道 websocket 中间件如何接收消息
How a django channel websocket middleware can receive message
中间件如何读取所有的 websocket 消息?
根据我的理解,django-channel 中间件就像 https://github.com/django/channels/blob/2a98606c1e0600cbae71ae1f02f31aae5d01f82d/channels/middleware.py。
async def coroutine_call(self, inner_instance, scope, receive, send):
"""
ASGI coroutine; where we can resolve items in the scope
(but you can't modify it at the top level here!)
"""
await inner_instance(receive, send)
我知道如果我调用 await receive()
而不是 await inner_instance(receive, send)
,我会收到一条 websocket 消息,但在这种情况下,websocket 处理程序将不再工作。
coroutine_call
如何才能接收到 websocket 消息并将其转发给下一个 websocket 中间件或处理程序?
要让中间件拦截消息,它需要拦截 receive
和 send
队列。
这是执行此操作的复杂中间件的示例 https://github.com/hishnash/channelsmultiplexer/blob/master/channelsmultiplexer/demultiplexer.py。
一个更简单的版本是:
class _InterceptionMiddleware:
def __init__(self, application_cls, scope):
self.application = application_cls(scope)
async def __call__(self, receive, send):
self.downstream_send = send
# create a Queue for the upstream consumer to read messages from
self.upstream_receive_queue = asyncio.Queue()
# create a Queue for the upstream consumer to write messages to
self.upstream_send_queue = asyncio.Queue()
# pipe messages being sent to the upstream consumer to your interceptor method
receiver = await_many_dispatch([receive], self.my_receive_interceptor_method)
# pipe messages being sent buy the upstream consumer to your interceptor method
sender = await_many_dispatch(
[self.upstream_send_queue.get],
self.my_send_interceptor_method
)
# set up an asyncio task to handle these pipes
receiver_task = asyncio.create_task(receiver)
sender_task = asyncio.create_task(sender)
# create an asyncio task for the upstream consumer
upstream_task = asyncio.create_task(
# pass the `get` and `put` methods of your upstream send and receive queues
self.application(self.upstream_receive_queue.get, self.upstream_send_queue.put)
)
# await it all
done, pending = await asyncio.wait(
[upstream_task, receiver_task, sender_task],
# if any of them fail stop
return_when=asyncio.FIRST_COMPLETED
)
for task in [upstream_task, receiver_task, sender_task]:
if not task.dont():
# we need to cancel this task.
task.cancel()
try:
await task
except CancelledError:
# we expect this error
pass
async def my_receive_interceptor_method(self, msg):
# your interception code
await self.upstream_receive_queue.put(msg)
async def my_send_interceptor_method(self, msg):
# your interception code
await self.downstream_send(msg)
def InterceptionMiddleware(application_cls):
return functools.partial(_InterceptionMiddleware, application_cls)
中间件如何读取所有的 websocket 消息?
根据我的理解,django-channel 中间件就像 https://github.com/django/channels/blob/2a98606c1e0600cbae71ae1f02f31aae5d01f82d/channels/middleware.py。
async def coroutine_call(self, inner_instance, scope, receive, send):
"""
ASGI coroutine; where we can resolve items in the scope
(but you can't modify it at the top level here!)
"""
await inner_instance(receive, send)
我知道如果我调用 await receive()
而不是 await inner_instance(receive, send)
,我会收到一条 websocket 消息,但在这种情况下,websocket 处理程序将不再工作。
coroutine_call
如何才能接收到 websocket 消息并将其转发给下一个 websocket 中间件或处理程序?
要让中间件拦截消息,它需要拦截 receive
和 send
队列。
这是执行此操作的复杂中间件的示例 https://github.com/hishnash/channelsmultiplexer/blob/master/channelsmultiplexer/demultiplexer.py。
一个更简单的版本是:
class _InterceptionMiddleware:
def __init__(self, application_cls, scope):
self.application = application_cls(scope)
async def __call__(self, receive, send):
self.downstream_send = send
# create a Queue for the upstream consumer to read messages from
self.upstream_receive_queue = asyncio.Queue()
# create a Queue for the upstream consumer to write messages to
self.upstream_send_queue = asyncio.Queue()
# pipe messages being sent to the upstream consumer to your interceptor method
receiver = await_many_dispatch([receive], self.my_receive_interceptor_method)
# pipe messages being sent buy the upstream consumer to your interceptor method
sender = await_many_dispatch(
[self.upstream_send_queue.get],
self.my_send_interceptor_method
)
# set up an asyncio task to handle these pipes
receiver_task = asyncio.create_task(receiver)
sender_task = asyncio.create_task(sender)
# create an asyncio task for the upstream consumer
upstream_task = asyncio.create_task(
# pass the `get` and `put` methods of your upstream send and receive queues
self.application(self.upstream_receive_queue.get, self.upstream_send_queue.put)
)
# await it all
done, pending = await asyncio.wait(
[upstream_task, receiver_task, sender_task],
# if any of them fail stop
return_when=asyncio.FIRST_COMPLETED
)
for task in [upstream_task, receiver_task, sender_task]:
if not task.dont():
# we need to cancel this task.
task.cancel()
try:
await task
except CancelledError:
# we expect this error
pass
async def my_receive_interceptor_method(self, msg):
# your interception code
await self.upstream_receive_queue.put(msg)
async def my_send_interceptor_method(self, msg):
# your interception code
await self.downstream_send(msg)
def InterceptionMiddleware(application_cls):
return functools.partial(_InterceptionMiddleware, application_cls)