parent 和 child 之间的通信进程在 Python 中分叉时

Communication between parent and child process when forking in Python

我试图让一个 Python 程序同时 运行 一个处理循环,以及一个结果广播服务,使用对 os.fork() 的调用,类似于

pid = os.fork()
if pid == 0:
    time.sleep(3)
    keep_updating_some_value_while_parent_is_running()
else:
    broadcast_value()

此处 keep_updating_some_value_while_parent_is_running() 由 child 执行,存储一些值,只要 parent 为 运行ning,它就会不断更新。它实际上将值写入磁盘,以便 parent 可以轻松访问它。它通过检查 运行 的 Web 服务是否可用来检测 parent 是否 运行ning。

broadcast_value() 运行 是一项 Web 服务,在查询时它会从磁盘读取最新值并提供服务。

这个实现效果很好,但由于以下几个原因并不令人满意:

  1. time.sleep(3)是必需的,因为网络服务需要 一些启动时间。根本无法保证服务会在 3 秒内启动并 运行ning,而另一方面可能会早得多。

  2. 通过磁盘共享数据并不总是一个好的选择,甚至不可能(所以这个解决方案不能很好地概括)。

  3. 通过检查 Web 服务是否可用来检测 parent 正在 运行ning 并不是最佳选择,而且对于不同类型的进程(无法轮询)自动如此容易)这根本行不通。此外,Web 服务可能 运行 正常,但暂时存在可用性问题。

  4. 解OS依赖。

  5. 当child由于某种原因失败或退出时,parent将继续 运行ning(这可能是期望的行为,但并非总是如此)。

我想要的是让 child 进程知道 parent 何时启动和 运行ning 以及何时停止的某种方式,以及 parent 根据请求获得由 child 计算的最新值,最好以 OS 独立的方式。也欢迎涉及 non-standard 个库的解决方案。

我建议使用 multiprocessing 而不是 os.fork(),因为它会为您处理很多细节。特别是它提供了 Manager class,它提供了一种在进程之间共享数据的好方法。您将启动一个 Process 来处理获取数据,另一个用于执行 Web 服务,然后将 Manager 提供的共享数据字典传递给它们。然后主进程只负责设置所有这些(并等待进程完成 - 否则 Manager 中断)。

这可能是这样的:

import time
from multiprocessing import Manager, Process

def get_data():
    """ Does the actual work of getting the updating value. """

def update_the_data(shared_dict):
    while not shared_dict.get('server_started'):
        time.sleep(.1)
    while True:
        shared_dict['data'] = get_data()
        shared_dict['data_timestamp'] = time.time()
        time.sleep(LOOP_DELAY)


def serve_the_data(shared_dict):
    server = initialize_server() # whatever this looks like
    shared_dict['server_started'] = True
    while True:
        server.serve_with_timeout()
        if time.time() - shared_dict['data_timestamp'] > 30:
            # child hasn't updated data for 30 seconds; problem?
            handle_child_problem()


if __name__ == '__main__':
    manager = Manager()
    shared_dict = manager.dict()
    processes = [Process(target=update_the_data, args=(shared_dict,)),
        Process(target=serve_the_data, args=(shared_dict,))]
    for process in processes:
        process.start()
    for process in processes:
        process.join()