从基于 gunicorn 的网络会话中终止异步多处理线程
Terminate asynchronous multiprocessing thread from gunicorn-based web session
我使用 Django 为多核科学计算库创建基于浏览器的 GUI,使用 gunicorn 作为网络服务器(另请参阅 Start multicore background process from Django view)。我调用库函数如下:
p = multiprocessing.Process(target=library_function, args=())
p.start()
print("test")
库函数本身处理多核逻辑,所以我在这里使用多处理模块的重点是让用户能够终止进程(通过p.terminate()
)。但是,这目前不起作用。我的假设是我还需要 p.join()
来终止 library_function()
调用的任何内容。问题是,当我在 p.start()
之后调用 p.join()
时,该过程似乎不再异步 运行,即在示例中,即 "test" 不会立即打印。但是,在我的用例中肯定需要异步。
是否有人提示如何 (1) 运行 后台进程和 (2) 仍然允许用户终止进程?
更新
根据 noxdafox 的评论,这是我现在的快速和肮脏的解决方案:
import psutil, signal, os, multiprocessing
# start process
p = multiprocessing.Process(target=library_function, args=())
p.start()
# stop process
for child in psutil.Process(p.pid).children(recursive=True):
child.kill() # kill child processes
os.kill(p.pid, signal.SIGKILL) # kill parent process
p.join() # kills parent zombie process
对于你的第一个问题,我建议异步任务处理Celery会是更好的选择。
使用 Celery,您可以让用户撤销任务:
result = add.apply_async(args=[2, 2], countdown=120)
result.revoke()
通过撤销,celery work 将不会执行任务
调用 p.terminate()
向进程发送 SIGTERM 信号。默认情况下 Python 解释器在收到此类信号时退出,但在某些情况下它不会:
- 进程正在覆盖 SIGTERM 信号的默认行为。例如,这可能由 Django 框架本身设置(我不知道它的内部结构)。
- 进程正在等待对 return 的 C/C++ 调用,无法对 SIGTERM 信号作出反应。
p.join()
阻塞调用者直到子进程没有结束。必须始终调用它以清理过期的子进程。您失去并行性的原因是您启动了新进程并等待它完成。
如果你想在一个单独的进程上生成一个任务并安全地终止它,你可以使用 Pebble 库:
from pebble import process
@process.concurrent
def library_function( ... ):
return
task = library_function( ... )
task.cancel()
更简单。
from pebble import process
task = process.concurrent(target=library_function, args=( ... ))
task.cancel()
编辑:你想做的是不同的,你现在尝试做的方式有点危险。
运行 子进程中的进程池(来自 multiprocessing.Process
而不是 subprocess
)不是很理想。此外,终止包含 Pool 的进程将导致所有工作进程的泄漏。
来自 documentation of Parallel,这是后台使用的进程池的实现。
Interruption of multiprocesses jobs with ‘Ctrl-C’
为此,您可以向子进程发送一个 SIGINT 信号,看看它是否真的有效。
import multiprocessing, signal, os
p = multiprocessing.Process(target=library_function, args=())
p.start()
# stop the job
os.kill(p.pid, signal.SIGINT)
p.join()
我使用 Django 为多核科学计算库创建基于浏览器的 GUI,使用 gunicorn 作为网络服务器(另请参阅 Start multicore background process from Django view)。我调用库函数如下:
p = multiprocessing.Process(target=library_function, args=())
p.start()
print("test")
库函数本身处理多核逻辑,所以我在这里使用多处理模块的重点是让用户能够终止进程(通过p.terminate()
)。但是,这目前不起作用。我的假设是我还需要 p.join()
来终止 library_function()
调用的任何内容。问题是,当我在 p.start()
之后调用 p.join()
时,该过程似乎不再异步 运行,即在示例中,即 "test" 不会立即打印。但是,在我的用例中肯定需要异步。
是否有人提示如何 (1) 运行 后台进程和 (2) 仍然允许用户终止进程?
更新
根据 noxdafox 的评论,这是我现在的快速和肮脏的解决方案:
import psutil, signal, os, multiprocessing
# start process
p = multiprocessing.Process(target=library_function, args=())
p.start()
# stop process
for child in psutil.Process(p.pid).children(recursive=True):
child.kill() # kill child processes
os.kill(p.pid, signal.SIGKILL) # kill parent process
p.join() # kills parent zombie process
对于你的第一个问题,我建议异步任务处理Celery会是更好的选择。
使用 Celery,您可以让用户撤销任务:
result = add.apply_async(args=[2, 2], countdown=120)
result.revoke()
通过撤销,celery work 将不会执行任务
调用 p.terminate()
向进程发送 SIGTERM 信号。默认情况下 Python 解释器在收到此类信号时退出,但在某些情况下它不会:
- 进程正在覆盖 SIGTERM 信号的默认行为。例如,这可能由 Django 框架本身设置(我不知道它的内部结构)。
- 进程正在等待对 return 的 C/C++ 调用,无法对 SIGTERM 信号作出反应。
p.join()
阻塞调用者直到子进程没有结束。必须始终调用它以清理过期的子进程。您失去并行性的原因是您启动了新进程并等待它完成。
如果你想在一个单独的进程上生成一个任务并安全地终止它,你可以使用 Pebble 库:
from pebble import process
@process.concurrent
def library_function( ... ):
return
task = library_function( ... )
task.cancel()
更简单。
from pebble import process
task = process.concurrent(target=library_function, args=( ... ))
task.cancel()
编辑:你想做的是不同的,你现在尝试做的方式有点危险。
运行 子进程中的进程池(来自 multiprocessing.Process
而不是 subprocess
)不是很理想。此外,终止包含 Pool 的进程将导致所有工作进程的泄漏。
来自 documentation of Parallel,这是后台使用的进程池的实现。
Interruption of multiprocesses jobs with ‘Ctrl-C’
为此,您可以向子进程发送一个 SIGINT 信号,看看它是否真的有效。
import multiprocessing, signal, os
p = multiprocessing.Process(target=library_function, args=())
p.start()
# stop the job
os.kill(p.pid, signal.SIGINT)
p.join()