从基于 gunicorn 的网络会话中终止异步多处理线程

Terminate asynchronous multiprocessing thread from gunicorn-based web session

我使用 Django 为多核科学计算库创建基于浏览器的 GUI,使用 gunicorn 作为网络服务器(另请参阅 Start multicore background process from Django view)。我调用库函数如下:

p = multiprocessing.Process(target=library_function, args=())
p.start()
print("test")

库函数本身处理多核逻辑,所以我在这里使用多处理模块的重点是让用户能够终止进程(通过p.terminate())。但是,这目前不起作用。我的假设是我还需要 p.join() 来终止 library_function() 调用的任何内容。问题是,当我在 p.start() 之后调用 p.join() 时,该过程似乎不再异步 运行,即在示例中,即 "test" 不会立即打印。但是,在我的用例中肯定需要异步。

是否有人提示如何 (1) 运行 后台进程和 (2) 仍然允许用户终止进程?

更新

根据 noxdafox 的评论,这是我现在的快速和肮脏的解决方案:

import psutil, signal, os, multiprocessing

# start process
p = multiprocessing.Process(target=library_function, args=())
p.start()

# stop process
for child in psutil.Process(p.pid).children(recursive=True):
    child.kill() # kill child processes
os.kill(p.pid, signal.SIGKILL) # kill parent process
p.join() # kills parent zombie process

对于你的第一个问题,我建议异步任务处理Celery会是更好的选择。

使用 Celery,您可以让用户撤销任务:

result = add.apply_async(args=[2, 2], countdown=120)

result.revoke()

通过撤销,celery work 将不会执行任务

调用 p.terminate() 向进程发送 SIGTERM 信号。默认情况下 Python 解释器在收到此类信号时退出,但在某些情况下它不会:

  • 进程正在覆盖 SIGTERM 信号的默认行为。例如,这可能由 Django 框架本身设置(我不知道它的内部结构)。
  • 进程正在等待对 return 的 C/C++ 调用,无法对 SIGTERM 信号作出反应。

p.join() 阻塞调用者直到子进程没有结束。必须始终调用它以清理过期的子进程。您失去并行性的原因是您启动了新进程并等待它完成。

如果你想在一个单独的进程上生成一个任务并安全地终止它,你可以使用 Pebble 库:

from pebble import process

@process.concurrent
def library_function( ... ):
    return

task = library_function( ... )
task.cancel()

更简单。

from pebble import process

task = process.concurrent(target=library_function, args=( ... ))
task.cancel()

编辑:你想做的是不同的,你现在尝试做的方式有点危险。

运行 子进程中的进程池(来自 multiprocessing.Process 而不是 subprocess)不是很理想。此外,终止包含 Pool 的进程将导致所有工作进程的泄漏。

来自 documentation of Parallel,这是后台使用的进程池的实现。

Interruption of multiprocesses jobs with ‘Ctrl-C’

为此,您可以向子进程发送一个 SIGINT 信号,看看它是否真的有效。

import multiprocessing, signal, os

p = multiprocessing.Process(target=library_function, args=())
p.start()

# stop the job    
os.kill(p.pid, signal.SIGINT)
p.join()