asyncio:运行 一个函数线程处理来自 websocket 客户端的多个请求

asyncio: run one function threaded with multiple requests from websocket clients

我有一个 websocket 服务器 (python 3.x) 接收请求,其中每个请求都是一个 url 变量。它运行得很好,除了它只一个接一个地串行执行每个请求。 虽然功能是 运行,但它也会阻止客户端尝试连接。 非阻塞是我想要的!

这是我得到的:


ANSWER(插图和 asyncio.subprocess 在已接受的答案中)

所以,我并没有对这种挫败感走得太远。我恢复到原来的代码,结果是,您需要使用 await asyncio.sleep(.001) 使函数休眠。现在它运行得很好,我同时测试了多个客户端并且它异步处理它。

import asyncio, websockets, json
async def handler(websocket, path):
    print("New client connected.")
    await websocket.send('CONNECTED')
    try:
        while True:
            inbound = await websocket.recv()
            if inbound is None:
                break
            while inbound != None:
                import time
                for line in range(10):
                    time.sleep(1)
                    data = {}
                    data['blah'] = line
                    await asyncio.sleep(.000001) # THIS
                    print(data)
                    await websocket.send(json.dumps(data))
                await websocket.send(json.dumps({'progress': 'DONE'}))
                break
    except websockets.exceptions.ConnectionClosed:
        print("Client disconnected.")
if __name__ == "__main__":
    server = websockets.serve(handler, '0.0.0.0', 8080)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

更新: 根据 @udi 的建议,如果你想要一个缓慢的外部进程,方法是 asyncio.subprocess 而不是子进程。使用阻塞调用从管道中读取会停止其他线程,这是 asyncio.subprocess 所关心的。

time.sleep() 正在阻塞。

尝试:

# blocking_server.py
import asyncio
import time

import websockets

x = 0


async def handler(websocket, path):
    global x
    x += 1
    client_id = x

    try:
        print("[#{}] Connected.".format(client_id))

        n = int(await websocket.recv())
        print("[#{}] Got: {}".format(client_id, n))
        for i in range(1, n + 1):
            print("[#{}] zzz...".format(client_id))
            time.sleep(1)
            print("[#{}] woke up!".format(client_id))
            await asyncio.sleep(.001)
            msg = "*" * i
            print("[#{}] sending: {}".format(client_id, msg))
            await websocket.send(msg)

        msg = "bye!"
        print("[#{}] sending: {}".format(client_id, msg))
        await websocket.send(msg)

        print("[#{}] Done.".format(client_id, msg))

    except websockets.exceptions.ConnectionClosed:
        print("[#{}] Disconnected!.".format(client_id))


if __name__ == "__main__":
    port = 8080
    server = websockets.serve(handler, '0.0.0.0', port)
    print("Started server on port {}".format(port))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

使用此测试客户端:

# test_client.py
import asyncio
import time

import websockets


async def client(client_id, n):
    t0 = time.time()
    async with websockets.connect('ws://localhost:8080') as websocket:
        print("[#{}] > {}".format(client_id, n))
        await websocket.send(str(n))
        while True:
            resp = await websocket.recv()
            print("[#{}] < {}".format(client_id, resp))
            if resp == "bye!":
                break

    print("[#{}] Done in {:.2f} seconds".format(client_id, time.time() - t0))


tasks = [client(i + 1, 3) for i in range(4)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

现在比较一下time.sleep(x)换成await asyncio.sleep(x)的结果!

如果您需要通过 asyncio 运行 一个缓慢的外部进程,尝试 asynico.subprocess:

外部程序示例:

# I am `slow_writer.py`
import sys
import time

n = int(sys.argv[1])

for i in range(1, n + 1):
    time.sleep(1)
    print("*" * i)

使用此服务器:

# nonblocking_server.py

import asyncio
import sys

import websockets

x = 0


async def handler(websocket, path):
    global x
    x += 1
    client_id = x

    try:
        print("[#{}] Connected.".format(client_id))

        n = int(await websocket.recv())

        print("[#{}] Got: {}. Running subprocess..".format(client_id, n))

        cmd = (sys.executable, 'slow_writer.py', str(n))
        proc = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE)

        async for data in proc.stdout:
            print("[#{}] got from subprocess, sending: {}".format(
                client_id, data))
            await websocket.send(data.decode().strip())

        return_value = await proc.wait()
        print("[#{}] Subprocess done.".format(client_id))

        msg = "bye!"
        print("[#{}] sending: {}".format(client_id, msg))
        await websocket.send(msg)

        print("[#{}] Done.".format(client_id, msg))

    except websockets.exceptions.ConnectionClosed:
        print("[#{}] Disconnected!.".format(client_id))


if __name__ == "__main__":

    if sys.platform == 'win32':
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)

    port = 8080
    server = websockets.serve(handler, '0.0.0.0', port)
    print("Started server on port {}".format(port))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()