Python 多处理进程中的 AsyncIO

Python AsyncIO within MultiProcessing Processes

我正在尝试创建两个 运行 永远的进程,每个 运行 在它们内部有一个异步循环。

我想了解逻辑是否正确。有没有更好的方法来做同样的事情?

import asyncio
import multiprocessing


async def my_async_func(topic):
    while True:
        await asyncio.sleep(5)
        print(topic)


def create_aio_loop(topic):
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(my_async_func(topic), loop=loop)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.stop()


def main():
    topic_a = 'New to Asyncio'
    topic_b = 'New to Multiprocessing'
    process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
    process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
    processes = [process_a, process_b]

    try:
        for process in processes:
            process.start()
    except KeyboardInterrupt:
        for process in processes:
            process.terminate()
            process.join()


if __name__ == '__main__':
    main()

I am trying to create two processes that run forever, each running an asyncio loop inside of them.

我假设您知道为什么您希望在多个处理(虚拟)内核(多处理)上分派您的一些代码并在同一内核(asyncio)上并行处理其余代码。

然后我认为你做对了:你产生了两个进程,每个进程都有自己的异步循环。我能找到的唯一改进是使用 loop.run_until_complete,它删除了一行代码 :) :

import os
import asyncio
import multiprocessing

async def my_async_func(topic):
    while True:
        await asyncio.sleep(5)
        print(topic)


def create_aio_loop(topic):
    process_name = "[Process %s, topic %s]" % (os.getpid(), topic)
    print("%s Started " % process_name)

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(my_async_func(topic))
    except KeyboardInterrupt:
        print("%s Loop interrupted" % process_name)
        loop.stop()

    print("%s terminating" % process_name)


if __name__ == '__main__':
    topic_a = 'New to Asyncio'
    topic_b = 'New to Multiprocessing'
    process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
    process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
    processes = [process_a, process_b]

    try:
        for process in processes:
            process.start()
    except KeyboardInterrupt:
        for process in processes:
            process.terminate()
            process.join()

哦,我还建议您使用包含进程 ID 的前缀显示所有消息,这对于多进程调试来说要容易得多。我介绍了一个带有 start/terminate 打印消息的示例。

运行 这会产生:

>python tmp_asyncio.py
[Process 11456, topic New to Multiprocessing] Started
[Process 18396, topic New to Asyncio] Started
New to Asyncio
New to Multiprocessing

(here I pressed Ctrl+C)

[Process 11456, topic New to Multiprocessing] Loop interrupted
[Process 11456, topic New to Multiprocessing] terminating
[Process 18396, topic New to Asyncio] Loop interrupted
[Process 18396, topic New to Asyncio] terminating
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\util.py", line 310, in _exit_function
    p.join()
  File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\process.py", line 121, in join
    res = self._popen.wait(timeout)
  File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\popen_spawn_win32.py", line 81, in wait
    res = _winapi.WaitForSingleObject(int(self._handle), msecs)
KeyboardInterrupt