Python 多处理进程中的 AsyncIO
Python AsyncIO within MultiProcessing Processes
我正在尝试创建两个 运行 永远的进程,每个 运行 在它们内部有一个异步循环。
我想了解逻辑是否正确。有没有更好的方法来做同样的事情?
import asyncio
import multiprocessing
async def my_async_func(topic):
while True:
await asyncio.sleep(5)
print(topic)
def create_aio_loop(topic):
loop = asyncio.get_event_loop()
asyncio.ensure_future(my_async_func(topic), loop=loop)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.stop()
def main():
topic_a = 'New to Asyncio'
topic_b = 'New to Multiprocessing'
process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
processes = [process_a, process_b]
try:
for process in processes:
process.start()
except KeyboardInterrupt:
for process in processes:
process.terminate()
process.join()
if __name__ == '__main__':
main()
I am trying to create two processes that run forever, each running an asyncio loop inside of them.
我假设您知道为什么您希望在多个处理(虚拟)内核(多处理)上分派您的一些代码并在同一内核(asyncio)上并行处理其余代码。
然后我认为你做对了:你产生了两个进程,每个进程都有自己的异步循环。我能找到的唯一改进是使用 loop.run_until_complete
,它删除了一行代码 :) :
import os
import asyncio
import multiprocessing
async def my_async_func(topic):
while True:
await asyncio.sleep(5)
print(topic)
def create_aio_loop(topic):
process_name = "[Process %s, topic %s]" % (os.getpid(), topic)
print("%s Started " % process_name)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(my_async_func(topic))
except KeyboardInterrupt:
print("%s Loop interrupted" % process_name)
loop.stop()
print("%s terminating" % process_name)
if __name__ == '__main__':
topic_a = 'New to Asyncio'
topic_b = 'New to Multiprocessing'
process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
processes = [process_a, process_b]
try:
for process in processes:
process.start()
except KeyboardInterrupt:
for process in processes:
process.terminate()
process.join()
哦,我还建议您使用包含进程 ID 的前缀显示所有消息,这对于多进程调试来说要容易得多。我介绍了一个带有 start/terminate 打印消息的示例。
运行 这会产生:
>python tmp_asyncio.py
[Process 11456, topic New to Multiprocessing] Started
[Process 18396, topic New to Asyncio] Started
New to Asyncio
New to Multiprocessing
(here I pressed Ctrl+C)
[Process 11456, topic New to Multiprocessing] Loop interrupted
[Process 11456, topic New to Multiprocessing] terminating
[Process 18396, topic New to Asyncio] Loop interrupted
[Process 18396, topic New to Asyncio] terminating
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\util.py", line 310, in _exit_function
p.join()
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\process.py", line 121, in join
res = self._popen.wait(timeout)
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\popen_spawn_win32.py", line 81, in wait
res = _winapi.WaitForSingleObject(int(self._handle), msecs)
KeyboardInterrupt
我正在尝试创建两个 运行 永远的进程,每个 运行 在它们内部有一个异步循环。
我想了解逻辑是否正确。有没有更好的方法来做同样的事情?
import asyncio
import multiprocessing
async def my_async_func(topic):
while True:
await asyncio.sleep(5)
print(topic)
def create_aio_loop(topic):
loop = asyncio.get_event_loop()
asyncio.ensure_future(my_async_func(topic), loop=loop)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.stop()
def main():
topic_a = 'New to Asyncio'
topic_b = 'New to Multiprocessing'
process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
processes = [process_a, process_b]
try:
for process in processes:
process.start()
except KeyboardInterrupt:
for process in processes:
process.terminate()
process.join()
if __name__ == '__main__':
main()
I am trying to create two processes that run forever, each running an asyncio loop inside of them.
我假设您知道为什么您希望在多个处理(虚拟)内核(多处理)上分派您的一些代码并在同一内核(asyncio)上并行处理其余代码。
然后我认为你做对了:你产生了两个进程,每个进程都有自己的异步循环。我能找到的唯一改进是使用 loop.run_until_complete
,它删除了一行代码 :) :
import os
import asyncio
import multiprocessing
async def my_async_func(topic):
while True:
await asyncio.sleep(5)
print(topic)
def create_aio_loop(topic):
process_name = "[Process %s, topic %s]" % (os.getpid(), topic)
print("%s Started " % process_name)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(my_async_func(topic))
except KeyboardInterrupt:
print("%s Loop interrupted" % process_name)
loop.stop()
print("%s terminating" % process_name)
if __name__ == '__main__':
topic_a = 'New to Asyncio'
topic_b = 'New to Multiprocessing'
process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
processes = [process_a, process_b]
try:
for process in processes:
process.start()
except KeyboardInterrupt:
for process in processes:
process.terminate()
process.join()
哦,我还建议您使用包含进程 ID 的前缀显示所有消息,这对于多进程调试来说要容易得多。我介绍了一个带有 start/terminate 打印消息的示例。
运行 这会产生:
>python tmp_asyncio.py
[Process 11456, topic New to Multiprocessing] Started
[Process 18396, topic New to Asyncio] Started
New to Asyncio
New to Multiprocessing
(here I pressed Ctrl+C)
[Process 11456, topic New to Multiprocessing] Loop interrupted
[Process 11456, topic New to Multiprocessing] terminating
[Process 18396, topic New to Asyncio] Loop interrupted
[Process 18396, topic New to Asyncio] terminating
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\util.py", line 310, in _exit_function
p.join()
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\process.py", line 121, in join
res = self._popen.wait(timeout)
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\popen_spawn_win32.py", line 81, in wait
res = _winapi.WaitForSingleObject(int(self._handle), msecs)
KeyboardInterrupt