Python websockets,无法接收消息

Python websockets, unable to receive messages

我正在使用来自父线程的 websockets in an python project i'm working on. The websocket is being run in an thread and given 2 queue's。我正在使用 javascript 连接到 websocket 服务器。

我能够通过self.ssi.get(True) 从父线程获取消息并将它们传递给 javascript websocket 客户端。

但是我无法接收来自客户端的消息。当我使用 zaproxy 时,我可以看到正在通过的消息。在 websocket 服务器上,我还能够看到数据包到达接口。 Python 不会抛出任何错误,并且 logger.setLevel(logging.DEBUG) 不会显示消息到达的方式与我能够看到正在发送的消息相同。

我一直在努力解决这个问题,但我 运行 找不到问题的主意,欢迎任何帮助。

Python 网络套接字服务器:

import websockets
import logging
import asyncio
import ssl

class websocket:
    def __init__(self,ssi,sso):

        self.ssi = ssi
        self.sso = sso

        logger = logging.getLogger('websockets')
        logger.setLevel(logging.DEBUG)
        # logger.addHandler(logging.FileHandler('debug.log'))
        logger.addHandler(logging.StreamHandler())

        sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        sslc.load_cert_chain(
            'keys/wss.server.crt',
            'keys/wss.server.key')

        loop = asyncio.new_event_loop()
        wsrv = websockets.serve(
            self.handler,
            host='0.0.0.0',
            port=9000,
            ssl=sslc,
            loop=loop)

        loop.run_until_complete(wsrv)
        loop.run_forever()

    async def handler(self, wss, path):
        consumer_task = asyncio.ensure_future(self.consumerHandler(wss, path))
        producer_task = asyncio.ensure_future(self.producerHandler(wss, path))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,)
        for task in pending:
            task.cancel()

    async def producerHandler(self, wss, path):
        while True:
            msg = await self.producer()
            await wss.send(str(msg))

    async def consumerHandler(self, wss, path):
        async for msg in wss:
            await self.consumer(msg)

    async def producer(self):
        return self.ssi.get(True)

    async def consumer(self, msg):
        self.sso.put(msg.data)

Javascript 客户:

var ws;

function ws_init() {

    ws = new WebSocket("wss://pri.local:9000/");

    ws.onopen = function(e) {
        output("connected");
    };

    ws.onmessage = function(e) {
        output("i: " + e.data);
    };

    ws.onclose = function() {
        output("disconnect");
    };

    ws.onerror = function(e) {
        output("onerror");
        console.log(e)
    };

}

function onSubmit() {
    var input = document.getElementById("input");
    ws.send(input.value);
    output("o: " + input.value);
    input.value = "";
    input.focus();
}

function onCloseClick() {
    ws.close();
}

function output(str) {
    var log = document.getElementById("log");
    var escaped = str.replace(/&/, "&amp;").replace(/</, "&lt;").
        replace(/>/, "&gt;").replace(/"/, "&quot;"); // "
        log.innerHTML = escaped + "<br>" + log.innerHTML;
}

我认为问题在于您混合使用了 queue 库和 asyncio.queue

queue 是线程安全的,因此是线程间通信的良好机制,但它没有异步 API,因此您在调用时阻塞了 websocket 线程 self.ssi.get(True) 这会阻止来自 运行.

的任何其他 websocket 代码

asyncio.queue 有你想要的 API(你可以 await queue.get()),但不幸的是它不是线程安全的(它是为在单线程异步应用程序中使用而设计的)。

您可以使用 loop.run_in_executor 来等待阻塞的 queue.get(True) 调用。示例见此处 https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html