如何使 ProcessPoolExecutor 中的任务表现得像守护进程?
How to make tasks in ProcessPoolExecutor behave like daemon process?
Python 3.6.6
代码如下:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
executor_processes = ProcessPoolExecutor(2)
def calculate():
while True:
print("while")
time.sleep(1)
async def async_method():
loop_ = asyncio.get_event_loop()
loop_.run_in_executor(executor_processes, calculate)
await asyncio.sleep(1)
print("finish sleep")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(async_method())
print("main_thread is finished")
输出:
while
finish sleep
main_thread is finished
while
while
...
我希望子进程将被终止,就像用守护进程生成进程时那样 属性 如:
import asyncio
import time
import multiprocessing
def calculate():
while True:
print("while")
time.sleep(1)
async def async_method():
proc = multiprocessing.Process(target=calculate)
proc.daemon = True
proc.start()
await asyncio.sleep(1)
print("finish sleep")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(async_method())
print("main_thread is finished")
输出:
while
finish sleep
main_thread is finished
问题: 如何将 loop_.run_in_executor(executor_processes, calculate)
行为更改为 "daemon-like"?
您显示的代码显然只是一个小示例,用于演示您希望实现的目标。我们不知道您的 real-world task/problem。但老实说,我不相信你在这里走的路是正确的。
ProcessPoolExecutor
是 concurrent.futures
标准库包的一部分。它returns一个Future
to the caller upon invoking submit()
. That Future
is a proxy for a result of a computation that has not yet completed. It's a promise; although that term is technically not quite correct in this context. See the Wiki page的区别。
这意味着,计算预期在有限时间内完成并产生结果。
这就是为什么ThreadPoolExecutor
和[= Python 中的 11=] 实现不允许生成守护进程。要求一个你实际上并不希望实现的结果的承诺没有多大意义。
你怎么还能实现你的目标?
1 - 子类 ProcessPoolExecutor
?
您可以拦截新进程的创建和启动以潜入 p.daemon = True
_adjust_process_count()
. However, since concurrent.futures
is not designed with indefinitely running tasks in mind, this won't help much. Unlike multiprocessing
, concurrent.futures.process
defines an exit handler 不考虑守护进程。它只是尝试 join()
一切,无限循环可能需要一些时间。
2 - 定义您自己的退出处理程序!
您可以做 multiprocessing
和 concurrent.futures.process
都做的事情:定义一个退出处理程序,它会在您的 Python 进程即将关闭时进行清理。 atexit 可以提供帮助:
import atexit
executor_processes = ProcessPoolExecutor(2)
def calculate():
while True:
print("while")
time.sleep(1)
def end_processes():
[proc.terminate() for proc in multiprocessing.active_children()]
async def async_method():
[...]
if __name__ == '__main__':
atexit.register(end_processes)
loop = asyncio.get_event_loop()
[...]
注意:这将终止在进程结束时处于活动状态的所有子进程。如果有您想要正常关闭的子进程,请保留句柄并在代码中的指令结束之前执行此操作。
另请注意,进程可以拒绝接受 terminate()
. kill()
是您的 最后一个度假村。
Python 3.6.6
代码如下:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
executor_processes = ProcessPoolExecutor(2)
def calculate():
while True:
print("while")
time.sleep(1)
async def async_method():
loop_ = asyncio.get_event_loop()
loop_.run_in_executor(executor_processes, calculate)
await asyncio.sleep(1)
print("finish sleep")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(async_method())
print("main_thread is finished")
输出:
while
finish sleep
main_thread is finished
while
while
...
我希望子进程将被终止,就像用守护进程生成进程时那样 属性 如:
import asyncio
import time
import multiprocessing
def calculate():
while True:
print("while")
time.sleep(1)
async def async_method():
proc = multiprocessing.Process(target=calculate)
proc.daemon = True
proc.start()
await asyncio.sleep(1)
print("finish sleep")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(async_method())
print("main_thread is finished")
输出:
while
finish sleep
main_thread is finished
问题: 如何将 loop_.run_in_executor(executor_processes, calculate)
行为更改为 "daemon-like"?
您显示的代码显然只是一个小示例,用于演示您希望实现的目标。我们不知道您的 real-world task/problem。但老实说,我不相信你在这里走的路是正确的。
ProcessPoolExecutor
是 concurrent.futures
标准库包的一部分。它returns一个Future
to the caller upon invoking submit()
. That Future
is a proxy for a result of a computation that has not yet completed. It's a promise; although that term is technically not quite correct in this context. See the Wiki page的区别。
这意味着,计算预期在有限时间内完成并产生结果。
这就是为什么ThreadPoolExecutor
和[= Python 中的 11=] 实现不允许生成守护进程。要求一个你实际上并不希望实现的结果的承诺没有多大意义。
你怎么还能实现你的目标?
1 - 子类 ProcessPoolExecutor
?
您可以拦截新进程的创建和启动以潜入 p.daemon = True
_adjust_process_count()
. However, since concurrent.futures
is not designed with indefinitely running tasks in mind, this won't help much. Unlike multiprocessing
, concurrent.futures.process
defines an exit handler 不考虑守护进程。它只是尝试 join()
一切,无限循环可能需要一些时间。
2 - 定义您自己的退出处理程序!
您可以做 multiprocessing
和 concurrent.futures.process
都做的事情:定义一个退出处理程序,它会在您的 Python 进程即将关闭时进行清理。 atexit 可以提供帮助:
import atexit
executor_processes = ProcessPoolExecutor(2)
def calculate():
while True:
print("while")
time.sleep(1)
def end_processes():
[proc.terminate() for proc in multiprocessing.active_children()]
async def async_method():
[...]
if __name__ == '__main__':
atexit.register(end_processes)
loop = asyncio.get_event_loop()
[...]
注意:这将终止在进程结束时处于活动状态的所有子进程。如果有您想要正常关闭的子进程,请保留句柄并在代码中的指令结束之前执行此操作。
另请注意,进程可以拒绝接受 terminate()
. kill()
是您的 最后一个度假村。