Python 中的 Keras + Tensorflow 和多处理

Keras + Tensorflow and Multiprocessing in Python

我使用 Keras 和 Tensorflow 作为后端。

我试图在我的主进程中保存一个模型,然后在另一个进程中 load/run(即调用 model.predict)。

我目前只是在尝试从文档到 save/load 模型的天真方法:https://keras.io/getting-started/faq/#how-can-i-save-a-keras-model
所以基本上:

  1. model.save() 在主进程中
  2. model = load_model() 在子进程中
  3. model.predict() 在子进程中

但是,它只是挂在 load_model 调用上。

四处搜索,我发现了这个可能相关的答案,表明 Keras 只能在一个过程中使用:using multiprocessing with theano 但我不确定这是否属实(似乎找不到太多相关信息)。

有没有办法实现我的目标?非常感谢高级描述或简短示例。

注意:我尝试过将图表传递给流程的方法,但失败了,因为它似乎无法选择张量流图表(此处相关的 SO post: ).如果确实有办法将 tensorflow graph/model 传递给子进程,那么我也愿意接受。

谢谢!

根据我的经验 - 问题在于将 Keras 加载到一个进程,然后在 keras 加载到您的主环境时生成一个新进程。但是对于某些应用程序(例如训练 Keras 模型的混合),将所有这些事情都放在一个过程中会更好。所以我的建议是以下(有点麻烦 - 但对我有用)方法:

  1. 不要将 KERAS 加载到您的主要环境中。如果你想加载 Keras / Theano / TensorFlow,只能在函数环境中进行。例如。 不要这样做:

    import keras
    
    def training_function(...):
        ...
    

    但请执行以下操作:

    def training_function(...):
        import keras
        ...
    
  2. 运行 在单独的过程中与每个模型相关联的工作: 我通常创建正在完成工作的工人(例如培训、调整、评分),我 运行 他们在不同的进程中。当您的进程完成时,此进程使用的整个内存完全 释放 有什么好处。这可以帮助您解决在一个进程中使用多处理甚至 运行 多个模型时通常会遇到的大量内存问题。所以这看起来例如像这样:

    def _training_worker(train_params):
        import keras
        model = obtain_model(train_params)
        model.fit(train_params)
        send_message_to_main_process(...)
    
    def train_new_model(train_params):
        training_process = multiprocessing.Process(target=_training_worker, args = train_params)
        training_process.start()
        get_message_from_training_process(...)
        training_process.join()
    

不同的方法只是为不同的模型动作准备不同的脚本。但这可能会导致内存错误,尤其是当您的模型正在消耗内存时。 注意由于这个原因,最好严格按顺序执行。

我创建了一个简单示例来展示如何 运行 Keras 在具有多个 GPU 的多个进程中建模。希望这个示例可以帮助你。 https://github.com/yuanyuanli85/Keras-Multiple-Process-Prediction

我创建了一个装饰器来修复我的代码。

from multiprocessing import Pipe, Process

def child_process(func):
    """Makes the function run as a separate process."""
    def wrapper(*args, **kwargs):
        def worker(conn, func, args, kwargs):
            conn.send(func(*args, **kwargs))
            conn.close()
        parent_conn, child_conn = Pipe()
        p = Process(target=worker, args=(child_conn, func, args, kwargs))
        p.start()
        ret = parent_conn.recv()
        p.join()
        return ret
return wrapper

@child_process
def keras_stuff():
    """ Keras stuff here"""