Python websockets,无法接收消息
Python websockets, unable to receive messages
我正在使用来自父线程的 websockets in an python project i'm working on. The websocket is being run in an thread and given 2 queue's。我正在使用 javascript 连接到 websocket 服务器。
我能够通过self.ssi.get(True)
从父线程获取消息并将它们传递给 javascript websocket 客户端。
但是我无法接收来自客户端的消息。当我使用 zaproxy 时,我可以看到正在通过的消息。在 websocket 服务器上,我还能够看到数据包到达接口。 Python 不会抛出任何错误,并且 logger.setLevel(logging.DEBUG)
不会显示消息到达的方式与我能够看到正在发送的消息相同。
我一直在努力解决这个问题,但我 运行 找不到问题的主意,欢迎任何帮助。
Python 网络套接字服务器:
import websockets
import logging
import asyncio
import ssl
class websocket:
def __init__(self,ssi,sso):
self.ssi = ssi
self.sso = sso
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
# logger.addHandler(logging.FileHandler('debug.log'))
logger.addHandler(logging.StreamHandler())
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(
'keys/wss.server.crt',
'keys/wss.server.key')
loop = asyncio.new_event_loop()
wsrv = websockets.serve(
self.handler,
host='0.0.0.0',
port=9000,
ssl=sslc,
loop=loop)
loop.run_until_complete(wsrv)
loop.run_forever()
async def handler(self, wss, path):
consumer_task = asyncio.ensure_future(self.consumerHandler(wss, path))
producer_task = asyncio.ensure_future(self.producerHandler(wss, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,)
for task in pending:
task.cancel()
async def producerHandler(self, wss, path):
while True:
msg = await self.producer()
await wss.send(str(msg))
async def consumerHandler(self, wss, path):
async for msg in wss:
await self.consumer(msg)
async def producer(self):
return self.ssi.get(True)
async def consumer(self, msg):
self.sso.put(msg.data)
Javascript 客户:
var ws;
function ws_init() {
ws = new WebSocket("wss://pri.local:9000/");
ws.onopen = function(e) {
output("connected");
};
ws.onmessage = function(e) {
output("i: " + e.data);
};
ws.onclose = function() {
output("disconnect");
};
ws.onerror = function(e) {
output("onerror");
console.log(e)
};
}
function onSubmit() {
var input = document.getElementById("input");
ws.send(input.value);
output("o: " + input.value);
input.value = "";
input.focus();
}
function onCloseClick() {
ws.close();
}
function output(str) {
var log = document.getElementById("log");
var escaped = str.replace(/&/, "&").replace(/</, "<").
replace(/>/, ">").replace(/"/, """); // "
log.innerHTML = escaped + "<br>" + log.innerHTML;
}
我认为问题在于您混合使用了 queue
库和 asyncio.queue
。
queue
是线程安全的,因此是线程间通信的良好机制,但它没有异步 API,因此您在调用时阻塞了 websocket 线程 self.ssi.get(True)
这会阻止来自 运行.
的任何其他 websocket 代码
asyncio.queue
有你想要的 API(你可以 await queue.get()
),但不幸的是它不是线程安全的(它是为在单线程异步应用程序中使用而设计的)。
您可以使用 loop.run_in_executor
来等待阻塞的 queue.get(True)
调用。示例见此处 https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html
我正在使用来自父线程的 websockets in an python project i'm working on. The websocket is being run in an thread and given 2 queue's。我正在使用 javascript 连接到 websocket 服务器。
我能够通过self.ssi.get(True)
从父线程获取消息并将它们传递给 javascript websocket 客户端。
但是我无法接收来自客户端的消息。当我使用 zaproxy 时,我可以看到正在通过的消息。在 websocket 服务器上,我还能够看到数据包到达接口。 Python 不会抛出任何错误,并且 logger.setLevel(logging.DEBUG)
不会显示消息到达的方式与我能够看到正在发送的消息相同。
我一直在努力解决这个问题,但我 运行 找不到问题的主意,欢迎任何帮助。
Python 网络套接字服务器:
import websockets
import logging
import asyncio
import ssl
class websocket:
def __init__(self,ssi,sso):
self.ssi = ssi
self.sso = sso
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
# logger.addHandler(logging.FileHandler('debug.log'))
logger.addHandler(logging.StreamHandler())
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(
'keys/wss.server.crt',
'keys/wss.server.key')
loop = asyncio.new_event_loop()
wsrv = websockets.serve(
self.handler,
host='0.0.0.0',
port=9000,
ssl=sslc,
loop=loop)
loop.run_until_complete(wsrv)
loop.run_forever()
async def handler(self, wss, path):
consumer_task = asyncio.ensure_future(self.consumerHandler(wss, path))
producer_task = asyncio.ensure_future(self.producerHandler(wss, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,)
for task in pending:
task.cancel()
async def producerHandler(self, wss, path):
while True:
msg = await self.producer()
await wss.send(str(msg))
async def consumerHandler(self, wss, path):
async for msg in wss:
await self.consumer(msg)
async def producer(self):
return self.ssi.get(True)
async def consumer(self, msg):
self.sso.put(msg.data)
Javascript 客户:
var ws;
function ws_init() {
ws = new WebSocket("wss://pri.local:9000/");
ws.onopen = function(e) {
output("connected");
};
ws.onmessage = function(e) {
output("i: " + e.data);
};
ws.onclose = function() {
output("disconnect");
};
ws.onerror = function(e) {
output("onerror");
console.log(e)
};
}
function onSubmit() {
var input = document.getElementById("input");
ws.send(input.value);
output("o: " + input.value);
input.value = "";
input.focus();
}
function onCloseClick() {
ws.close();
}
function output(str) {
var log = document.getElementById("log");
var escaped = str.replace(/&/, "&").replace(/</, "<").
replace(/>/, ">").replace(/"/, """); // "
log.innerHTML = escaped + "<br>" + log.innerHTML;
}
我认为问题在于您混合使用了 queue
库和 asyncio.queue
。
queue
是线程安全的,因此是线程间通信的良好机制,但它没有异步 API,因此您在调用时阻塞了 websocket 线程 self.ssi.get(True)
这会阻止来自 运行.
asyncio.queue
有你想要的 API(你可以 await queue.get()
),但不幸的是它不是线程安全的(它是为在单线程异步应用程序中使用而设计的)。
您可以使用 loop.run_in_executor
来等待阻塞的 queue.get(True)
调用。示例见此处 https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html