为什么我在并行处理时出错
Why I am getting error at the time of parallel processing
我正在传递 dictionary
的 key
和 value
进行并行处理
if __name__ == "__main__":
DATASETS = {
"Dataset_1": data_preprocess.dataset_1,
"Dataset_2": data_preprocess.dataset_2,}
pool = mp.Pool(8)
pool.starmap(main, zip(DATASETS.keys(), DATASETS.values()))
pool.close()
# As I am not joining any result and I am directly saving the output
# in CSV file from (main function) I did not used pool.join()
main
函数
def main(dataset_name, generate_dataset):
REGRESSORS = {
"LinReg": LinearRegression(),
"Lasso": Lasso(),}
ROOT = Path(__file__).resolve().parent
dataset_name = dataset_name
generate_dataset = generate_dataset
dfs = []
for reg_name, regressor in REGRESSORS.items():
df = function_calling(
generate_dataset=generate_dataset,
regressor=regressor,
reg_name=reg_name,)
print(df)
dfs.append(df)
df = pd.concat(dfs, axis=0, ignore_index=True)
filename = dataset_name + "_result.csv"
outfile = str(PATH) + "/" + filename
df.to_csv(outfile)
我遇到错误 AssertionError: daemonic processes are not allowed to have children
。
你能告诉我为什么会出现错误吗?我该如何解决这个问题?
只创建您自己的 Process
个实例:
import multiprocessing as mp
def main(dataset_name, generate_dataset):
print(dataset_name, generate_dataset, flush=True)
... # etc.
if __name__ == "__main__":
DATASETS = {
"Dataset_1": 1,
"Dataset_2": 2,}
processes = [mp.Process(target=main, args=(k, v)) for k, v in DATASETS.items()]
for process in processes:
process.start()
# wait for termination:
for process in processes:
process.join
打印:
Dataset_1 1
Dataset_2 2
假设您有 8 个 CPU 核,DATASETS
有 100 个 key/value 对。您将创建 100 个进程。假设这些流程是 CPU 密集型流程,您不能指望其中超过 8 个流程真正在做任何有成效的事情。然而,您招致了 CPU 和创建所有这些进程的存储开销。但是只要您要创建的进程数不超过您拥有的 CPU 个核心数 并且您的函数 main
不需要 return返回主进程的值,这应该没问题。
还有一种方法可以使用这些 Process
个实例和一个 Queue
个实例来实现您自己的多处理池,但这有点复杂:
import multiprocessing as mp
def main(dataset_name, generate_dataset):
print(dataset_name, generate_dataset, flush=True)
... # etc.
def worker(queue):
while True:
arg = queue.get()
if arg is None:
# signal to terminate
break
# unpack
dataset_name, generate_dataset = arg
main(dataset_name, generate_dataset)
if __name__ == "__main__":
DATASETS = {
"Dataset_1": 1,
"Dataset_2": 2,}
queue = mp.Queue()
items = list(DATASETS.items())
for k, v in items:
# put the arguments on the queue
queue.put((k, v))
# number of processors we will be using:
n_processors = min(mp.cpu_count(), len(items))
for _ in range(n_processors):
# special value to tell main there is no nore work: one for each task
queue.put(None)
processes = [mp.Process(target=worker, args=(queue,)) for _ in range(n_processors)]
for process in processes:
process.start()
for process in processes:
process.join
我正在传递 dictionary
的 key
和 value
进行并行处理
if __name__ == "__main__":
DATASETS = {
"Dataset_1": data_preprocess.dataset_1,
"Dataset_2": data_preprocess.dataset_2,}
pool = mp.Pool(8)
pool.starmap(main, zip(DATASETS.keys(), DATASETS.values()))
pool.close()
# As I am not joining any result and I am directly saving the output
# in CSV file from (main function) I did not used pool.join()
main
函数
def main(dataset_name, generate_dataset):
REGRESSORS = {
"LinReg": LinearRegression(),
"Lasso": Lasso(),}
ROOT = Path(__file__).resolve().parent
dataset_name = dataset_name
generate_dataset = generate_dataset
dfs = []
for reg_name, regressor in REGRESSORS.items():
df = function_calling(
generate_dataset=generate_dataset,
regressor=regressor,
reg_name=reg_name,)
print(df)
dfs.append(df)
df = pd.concat(dfs, axis=0, ignore_index=True)
filename = dataset_name + "_result.csv"
outfile = str(PATH) + "/" + filename
df.to_csv(outfile)
我遇到错误 AssertionError: daemonic processes are not allowed to have children
。
你能告诉我为什么会出现错误吗?我该如何解决这个问题?
只创建您自己的 Process
个实例:
import multiprocessing as mp
def main(dataset_name, generate_dataset):
print(dataset_name, generate_dataset, flush=True)
... # etc.
if __name__ == "__main__":
DATASETS = {
"Dataset_1": 1,
"Dataset_2": 2,}
processes = [mp.Process(target=main, args=(k, v)) for k, v in DATASETS.items()]
for process in processes:
process.start()
# wait for termination:
for process in processes:
process.join
打印:
Dataset_1 1
Dataset_2 2
假设您有 8 个 CPU 核,DATASETS
有 100 个 key/value 对。您将创建 100 个进程。假设这些流程是 CPU 密集型流程,您不能指望其中超过 8 个流程真正在做任何有成效的事情。然而,您招致了 CPU 和创建所有这些进程的存储开销。但是只要您要创建的进程数不超过您拥有的 CPU 个核心数 并且您的函数 main
不需要 return返回主进程的值,这应该没问题。
还有一种方法可以使用这些 Process
个实例和一个 Queue
个实例来实现您自己的多处理池,但这有点复杂:
import multiprocessing as mp
def main(dataset_name, generate_dataset):
print(dataset_name, generate_dataset, flush=True)
... # etc.
def worker(queue):
while True:
arg = queue.get()
if arg is None:
# signal to terminate
break
# unpack
dataset_name, generate_dataset = arg
main(dataset_name, generate_dataset)
if __name__ == "__main__":
DATASETS = {
"Dataset_1": 1,
"Dataset_2": 2,}
queue = mp.Queue()
items = list(DATASETS.items())
for k, v in items:
# put the arguments on the queue
queue.put((k, v))
# number of processors we will be using:
n_processors = min(mp.cpu_count(), len(items))
for _ in range(n_processors):
# special value to tell main there is no nore work: one for each task
queue.put(None)
processes = [mp.Process(target=worker, args=(queue,)) for _ in range(n_processors)]
for process in processes:
process.start()
for process in processes:
process.join