为什么我在并行处理时出错

Why I am getting error at the time of parallel processing

我正在传递 dictionarykeyvalue 进行并行处理

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": data_preprocess.dataset_1,
        "Dataset_2": data_preprocess.dataset_2,}
    pool = mp.Pool(8)
    pool.starmap(main, zip(DATASETS.keys(), DATASETS.values()))
    pool.close()
# As I am not joining any result and I am directly saving the output 
# in CSV file from  (main function) I did not used pool.join()

main函数

def main(dataset_name, generate_dataset):
    REGRESSORS = {
        "LinReg": LinearRegression(),
        "Lasso": Lasso(),}
    ROOT = Path(__file__).resolve().parent
    dataset_name = dataset_name
    generate_dataset = generate_dataset
    dfs = []
    for reg_name, regressor in REGRESSORS.items():
        df = function_calling(
            generate_dataset=generate_dataset,
            regressor=regressor,
            reg_name=reg_name,)
        print(df)
        dfs.append(df)
    df = pd.concat(dfs, axis=0, ignore_index=True)
    filename = dataset_name + "_result.csv"
    outfile = str(PATH) + "/" + filename
    df.to_csv(outfile)

我遇到错误 AssertionError: daemonic processes are not allowed to have children。 你能告诉我为什么会出现错误吗?我该如何解决这个问题?

只创建您自己的 Process 个实例:

import multiprocessing as mp

def main(dataset_name, generate_dataset):
    print(dataset_name, generate_dataset, flush=True)
    ... # etc.

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": 1,
        "Dataset_2": 2,}
    processes = [mp.Process(target=main, args=(k, v)) for k, v in DATASETS.items()]
    for process in processes:
        process.start()
    # wait for termination:
    for process in processes:
        process.join

打印:

Dataset_1 1
Dataset_2 2

假设您有 8 个 CPU 核,DATASETS 有 100 个 key/value 对。您将创建 100 个进程。假设这些流程是 CPU 密集型流程,您不能指望其中超过 8 个流程真正在做任何有成效的事情。然而,您招致了 CPU 和创建所有这些进程的存储开销。但是只要您要创建的进程数不超过您拥有的 CPU 个核心数 并且您的函数 main 不需要 return返回主进程的值,这应该没问题。

还有一种方法可以使用这些 Process 个实例和一个 Queue 个实例来实现您自己的多处理池,但这有点复杂:

import multiprocessing as mp

def main(dataset_name, generate_dataset):

    print(dataset_name, generate_dataset, flush=True)
    ... # etc.

def worker(queue):
    while True:
        arg = queue.get()
        if arg is None:
            # signal to terminate
            break
        # unpack
        dataset_name, generate_dataset = arg
        main(dataset_name, generate_dataset)

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": 1,
        "Dataset_2": 2,}
    queue = mp.Queue()
    items = list(DATASETS.items())
    for k, v in items:
        # put the arguments on the queue
        queue.put((k, v))
    # number of processors we will be using:
    n_processors = min(mp.cpu_count(), len(items))
    for _ in range(n_processors):
        # special value to tell main there is no nore work: one for each task
        queue.put(None)
    processes = [mp.Process(target=worker, args=(queue,)) for _ in range(n_processors)]
    for process in processes:
        process.start()
    for process in processes:
        process.join