多处理 API

Multiprocessing API

num_folds = 3

def callModelScore(model, datax, datay, scoringType, folds):
    rating = model_selection.cross_val_score(gnb,
        X, y, scoring=scoringType, cv=num_folds)
    meanRating = str(round(100*rating.mean(), 2))
    print(scoringType + " " + meanRating + "%")
    return rating

from multiprocessing import Process

p1 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p2 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p3 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p4 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

错误:

BrokenPipeError                           Traceback (most recent call last)
<ipython-input-22-0aaad613b937> in <module>
      5 p3 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
      6 p4 = Process(target =  callModelScore,args =(gnb, X, y, 'accuracy', num_folds,))
----> 7 p1.start()
      8 p2.start()
      9 p3.start()

~\Anaconda3\lib\multiprocessing\process.py in start(self)
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
--> 112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect

~\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224
    225 class DefaultContext(BaseContext):

~\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323
    324     class SpawnContext(BaseContext):

~\Anaconda3\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     87             try:
     88                 reduction.dump(prep_data, to_child)
---> 89                 reduction.dump(process_obj, to_child)
     90             finally:
     91                 set_spawning_popen(None)

~\Anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61
     62 #

BrokenPipeError: [Errno 32] Broken pipe

我正在使用 python API 进行多处理来测试 API,但我似乎无法让任何东西工作。我什至尝试了文档中的一些测试代码 python 3. 我正在使用带有 anaconda 的 jupyter notebook。

我不是这方面的专家,但我认为不足之处在于 Jupyter Notebooks 已经使用 pickle 进行数据序列化。这意味着子进程的数据流与生成它们的主进程之间存在歧义。幸运的是,有一个 multiprocessing 的分支项目似乎得到了积极维护。参见 multiprocess API (note, it differs by the "ing"). This API uses dill instead of pickle. Which, aside from being hilarious, should allow your shell to differentiate between the children and the main process. This is also eluded to in the documentation

Functionality within this package requires that the main module be importable by the children. This is covered in Programming guidelines however it is worth pointing out here. This means that some examples, such as the multiprocessing.pool.Pool examples will not work in the interactive interpreter.