使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器

Could not use os.fork() bind several process to one socket server when using asyncio

我们都知道使用 asyncio 可以显着提高套接字服务器的性能,如果我们可以利用 cpu 中的所有内核(可能通过多处理模块或 os.fork()等)

我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器侦听每个核心并全部绑定到一个端口。只需创建一个异步服务器,然后使用 os.fork(),让进程有竞争力地工作。

然而,当我尝试 fork 时,单核精细代码 运行 遇到了一些麻烦。似乎在 epoll 选择器模块中注册来自不同进程的相同文件描述符存在一些问题。

我在下面展示了一些代码,有人可以帮我吗?


下面是一个使用asyncio的简单、逻辑清晰的回显服务器代码:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

在套接字启动之后和服务器开始服务之前,在我尝试分叉之前它工作正常。 (此逻辑在基于同步线程的代码中运行良好。)


当我尝试这个时:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

理论上分叉的进程是bounl到同一个套接字?和 运行 在同一个事件循环中?那么工作就好了?

但是我收到了这些错误消息:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python 版本 3.7.3,

我对发生的事情一头雾水。

有人能帮忙吗?谢谢

根据 the tracker issue, it is not supported to fork an existing asyncio event loop and attempt to use it from multiple processes. However, according to Yury's comment 在同一问题上的说法,multi-processing 可以在开始循环之前通过分叉来实现,因此 运行 每个 child 中的完全独立的异步循环。

您的代码实际上证实了这种可能性:虽然 create_serverasync def,但它不等待任何东西,也不使用 loop 参数。所以我们可以通过使 create_server 成为常规函数,删除 loop 参数,并在 os.fork() 之前调用它,并且在分叉后仅 运行 事件循环来实现 Yury 的方法:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()