asyncio:运行 一个函数线程处理来自 websocket 客户端的多个请求
asyncio: run one function threaded with multiple requests from websocket clients
我有一个 websocket 服务器 (python 3.x) 接收请求,其中每个请求都是一个 url 变量。它运行得很好,除了它只一个接一个地串行执行每个请求。 虽然功能是 运行,但它也会阻止客户端尝试连接。 非阻塞是我想要的!
- 异步
多处理 websocket 和子进程函数的线程。
能够设置要使用的核心数。不过这不是强制性的。
这是我得到的:
ANSWER(插图和 asyncio.subprocess
在已接受的答案中)
所以,我并没有对这种挫败感走得太远。我恢复到原来的代码,结果是,您需要使用 await asyncio.sleep(.001)
使函数休眠。现在它运行得很好,我同时测试了多个客户端并且它异步处理它。
import asyncio, websockets, json
async def handler(websocket, path):
print("New client connected.")
await websocket.send('CONNECTED')
try:
while True:
inbound = await websocket.recv()
if inbound is None:
break
while inbound != None:
import time
for line in range(10):
time.sleep(1)
data = {}
data['blah'] = line
await asyncio.sleep(.000001) # THIS
print(data)
await websocket.send(json.dumps(data))
await websocket.send(json.dumps({'progress': 'DONE'}))
break
except websockets.exceptions.ConnectionClosed:
print("Client disconnected.")
if __name__ == "__main__":
server = websockets.serve(handler, '0.0.0.0', 8080)
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
更新: 根据 @udi 的建议,如果你想要一个缓慢的外部进程,方法是 asyncio.subprocess 而不是子进程。使用阻塞调用从管道中读取会停止其他线程,这是 asyncio.subprocess 所关心的。
time.sleep()
正在阻塞。
尝试:
# blocking_server.py
import asyncio
import time
import websockets
x = 0
async def handler(websocket, path):
global x
x += 1
client_id = x
try:
print("[#{}] Connected.".format(client_id))
n = int(await websocket.recv())
print("[#{}] Got: {}".format(client_id, n))
for i in range(1, n + 1):
print("[#{}] zzz...".format(client_id))
time.sleep(1)
print("[#{}] woke up!".format(client_id))
await asyncio.sleep(.001)
msg = "*" * i
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
msg = "bye!"
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
print("[#{}] Done.".format(client_id, msg))
except websockets.exceptions.ConnectionClosed:
print("[#{}] Disconnected!.".format(client_id))
if __name__ == "__main__":
port = 8080
server = websockets.serve(handler, '0.0.0.0', port)
print("Started server on port {}".format(port))
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
使用此测试客户端:
# test_client.py
import asyncio
import time
import websockets
async def client(client_id, n):
t0 = time.time()
async with websockets.connect('ws://localhost:8080') as websocket:
print("[#{}] > {}".format(client_id, n))
await websocket.send(str(n))
while True:
resp = await websocket.recv()
print("[#{}] < {}".format(client_id, resp))
if resp == "bye!":
break
print("[#{}] Done in {:.2f} seconds".format(client_id, time.time() - t0))
tasks = [client(i + 1, 3) for i in range(4)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
现在比较一下time.sleep(x)
换成await asyncio.sleep(x)
的结果!
如果您需要通过 asyncio 运行 一个缓慢的外部进程,尝试 asynico.subprocess
:
外部程序示例:
# I am `slow_writer.py`
import sys
import time
n = int(sys.argv[1])
for i in range(1, n + 1):
time.sleep(1)
print("*" * i)
使用此服务器:
# nonblocking_server.py
import asyncio
import sys
import websockets
x = 0
async def handler(websocket, path):
global x
x += 1
client_id = x
try:
print("[#{}] Connected.".format(client_id))
n = int(await websocket.recv())
print("[#{}] Got: {}. Running subprocess..".format(client_id, n))
cmd = (sys.executable, 'slow_writer.py', str(n))
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE)
async for data in proc.stdout:
print("[#{}] got from subprocess, sending: {}".format(
client_id, data))
await websocket.send(data.decode().strip())
return_value = await proc.wait()
print("[#{}] Subprocess done.".format(client_id))
msg = "bye!"
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
print("[#{}] Done.".format(client_id, msg))
except websockets.exceptions.ConnectionClosed:
print("[#{}] Disconnected!.".format(client_id))
if __name__ == "__main__":
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
port = 8080
server = websockets.serve(handler, '0.0.0.0', port)
print("Started server on port {}".format(port))
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
我有一个 websocket 服务器 (python 3.x) 接收请求,其中每个请求都是一个 url 变量。它运行得很好,除了它只一个接一个地串行执行每个请求。 虽然功能是 运行,但它也会阻止客户端尝试连接。 非阻塞是我想要的!
- 异步
多处理websocket 和子进程函数的线程。 能够设置要使用的核心数。不过这不是强制性的。
这是我得到的:
ANSWER(插图和 asyncio.subprocess
在已接受的答案中)
所以,我并没有对这种挫败感走得太远。我恢复到原来的代码,结果是,您需要使用 await asyncio.sleep(.001)
使函数休眠。现在它运行得很好,我同时测试了多个客户端并且它异步处理它。
import asyncio, websockets, json
async def handler(websocket, path):
print("New client connected.")
await websocket.send('CONNECTED')
try:
while True:
inbound = await websocket.recv()
if inbound is None:
break
while inbound != None:
import time
for line in range(10):
time.sleep(1)
data = {}
data['blah'] = line
await asyncio.sleep(.000001) # THIS
print(data)
await websocket.send(json.dumps(data))
await websocket.send(json.dumps({'progress': 'DONE'}))
break
except websockets.exceptions.ConnectionClosed:
print("Client disconnected.")
if __name__ == "__main__":
server = websockets.serve(handler, '0.0.0.0', 8080)
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
更新: 根据 @udi 的建议,如果你想要一个缓慢的外部进程,方法是 asyncio.subprocess 而不是子进程。使用阻塞调用从管道中读取会停止其他线程,这是 asyncio.subprocess 所关心的。
time.sleep()
正在阻塞。
尝试:
# blocking_server.py
import asyncio
import time
import websockets
x = 0
async def handler(websocket, path):
global x
x += 1
client_id = x
try:
print("[#{}] Connected.".format(client_id))
n = int(await websocket.recv())
print("[#{}] Got: {}".format(client_id, n))
for i in range(1, n + 1):
print("[#{}] zzz...".format(client_id))
time.sleep(1)
print("[#{}] woke up!".format(client_id))
await asyncio.sleep(.001)
msg = "*" * i
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
msg = "bye!"
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
print("[#{}] Done.".format(client_id, msg))
except websockets.exceptions.ConnectionClosed:
print("[#{}] Disconnected!.".format(client_id))
if __name__ == "__main__":
port = 8080
server = websockets.serve(handler, '0.0.0.0', port)
print("Started server on port {}".format(port))
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
使用此测试客户端:
# test_client.py
import asyncio
import time
import websockets
async def client(client_id, n):
t0 = time.time()
async with websockets.connect('ws://localhost:8080') as websocket:
print("[#{}] > {}".format(client_id, n))
await websocket.send(str(n))
while True:
resp = await websocket.recv()
print("[#{}] < {}".format(client_id, resp))
if resp == "bye!":
break
print("[#{}] Done in {:.2f} seconds".format(client_id, time.time() - t0))
tasks = [client(i + 1, 3) for i in range(4)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
现在比较一下time.sleep(x)
换成await asyncio.sleep(x)
的结果!
如果您需要通过 asyncio 运行 一个缓慢的外部进程,尝试 asynico.subprocess
:
外部程序示例:
# I am `slow_writer.py`
import sys
import time
n = int(sys.argv[1])
for i in range(1, n + 1):
time.sleep(1)
print("*" * i)
使用此服务器:
# nonblocking_server.py
import asyncio
import sys
import websockets
x = 0
async def handler(websocket, path):
global x
x += 1
client_id = x
try:
print("[#{}] Connected.".format(client_id))
n = int(await websocket.recv())
print("[#{}] Got: {}. Running subprocess..".format(client_id, n))
cmd = (sys.executable, 'slow_writer.py', str(n))
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE)
async for data in proc.stdout:
print("[#{}] got from subprocess, sending: {}".format(
client_id, data))
await websocket.send(data.decode().strip())
return_value = await proc.wait()
print("[#{}] Subprocess done.".format(client_id))
msg = "bye!"
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
print("[#{}] Done.".format(client_id, msg))
except websockets.exceptions.ConnectionClosed:
print("[#{}] Disconnected!.".format(client_id))
if __name__ == "__main__":
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
port = 8080
server = websockets.serve(handler, '0.0.0.0', port)
print("Started server on port {}".format(port))
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()