Python 多处理 - 棘手的用例,包括传递参数
Python Multiprocessing - Tricky use-case, includes passing arguments
我在并行分配函数时遇到问题。
问题陈述: 我有 2 个坐标对列表,dfC
和 dfO
。对于 dfC
中的每个 obs,我正在计算有多少 dfO
落在半径 r
内。 我目前有一个工作函数,但我正在尝试看看是否可以并行处理它。
问题是:dfC
可以拆分并单独处理...但是 dfO
需要每个工人 100%。我的方法是,让我先让它并行工作——然后我会担心如何将 dfO
的完整副本分发给工作人员。除非有人能帮我解决这两个问题?
首先,这是设置所有内容的代码:
import pandas as pd
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, process
import traceback
from scipy.spatial import cKDTree
# create 2 dataframes with random "coordinates"
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy'))
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk'))
这是 dfC
的示例,dfO
看起来也很相似
+----+----+
| x | y |
+----+----+
| 35 | 5 |
+----+----+
| 96 | 18 |
+----+----+
| 23 | 25 |
+----+----+
| 20 | 7 |
+----+----+
| 74 | 54 |
+----+----+
接下来,这是一个像 charm 一样工作的函数。我没有单独传递所有参数,而是故意这样做 - 准备一个 main 函数并行调用它们(否则我找不到多处理的方法)。
# this function works on dfC, and adds a row which counts the number
# of objects in dfO which are within radius r
def worker_job(args):
try:
dfC, dfO, newcol, r = args
mxC=dfC.as_matrix()
mxO = dfO.as_matrix()
# magic tree stuff
C_Tree = cKDTree(mxC)
O_Tree = cKDTree(mxO)
listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0)
counts=[]
for i in listoflists:
counts.append(len(i))
s = pd.Series(counts)
dfC[newcol] = s.values
except:
raise
traceback.print_exc()
else:
return dfC
如果我这样创建我的参数:
args=[dfC,dfO,"new_column_name",3]
当我 运行 它本身时它完美地工作:
worker_job(args)
+----+----+-----------------+
| x | y | new_column_name |
+----+----+-----------------+
| 35 | 5 | 4 |
+----+----+-----------------+
| 96 | 18 | 1 |
+----+----+-----------------+
| 23 | 25 | 0 |
+----+----+-----------------+
| 20 | 7 | 1 |
+----+----+-----------------+
| 74 | 54 | 2 |
+----+----+-----------------+
现在,我尝试构建将并行控制并行工作器和 运行 这个东西的函数。这是我的最大努力:
# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
try:
pool = Pool(Num_Proc)
parts = pool.map(Function,args)
pool.close()
pool.join()
results_df = pd.concat(parts)
except:
pool.close()
pool.terminate()
traceback.print_exc()
else:
return results_df
这是行不通的。 Run_Parallel(worker_job,2,args)
抛出关于 ValueError: not enough values to unpack (expected 4, got 2)
的错误。在通过包装器时,参数列表一定发生了变化。
我正在寻找关于此错误的具体指导,并为知道如何解决更大问题的任何人加分——我需要我的池包含 100% 的 dfO
和一个子集dfC
为了效率。
答案是将参数作为列表列表传递。这也解决了拆分数据帧的另一个问题(我认为池默认会处理这个问题,但事实并非如此)。
正确的函数应该是这样的:
# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
dfC, dfO, newcol, r = args
# to make lists of lists
argslist=[]
dfOlist=[]
dfClist=[]
resultlist=[]
# split dfC into parts
Cparts=np.array_split(dfC, Num_Proc)
# build the lists
for i in range(Num_Proc):
argslist.append([Cparts[i],dfO,newcol,r])
try:
pool = Pool(Num_Proc)
parts = pool.map(Function,argslist)
pool.close()
pool.join()
results_df = pd.concat(parts)
except:
pool.close()
pool.terminate()
traceback.print_exc()
else:
return results_df
我在并行分配函数时遇到问题。
问题陈述: 我有 2 个坐标对列表,dfC
和 dfO
。对于 dfC
中的每个 obs,我正在计算有多少 dfO
落在半径 r
内。 我目前有一个工作函数,但我正在尝试看看是否可以并行处理它。
问题是:dfC
可以拆分并单独处理...但是 dfO
需要每个工人 100%。我的方法是,让我先让它并行工作——然后我会担心如何将 dfO
的完整副本分发给工作人员。除非有人能帮我解决这两个问题?
首先,这是设置所有内容的代码:
import pandas as pd
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, process
import traceback
from scipy.spatial import cKDTree
# create 2 dataframes with random "coordinates"
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy'))
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk'))
这是 dfC
的示例,dfO
看起来也很相似
+----+----+
| x | y |
+----+----+
| 35 | 5 |
+----+----+
| 96 | 18 |
+----+----+
| 23 | 25 |
+----+----+
| 20 | 7 |
+----+----+
| 74 | 54 |
+----+----+
接下来,这是一个像 charm 一样工作的函数。我没有单独传递所有参数,而是故意这样做 - 准备一个 main 函数并行调用它们(否则我找不到多处理的方法)。
# this function works on dfC, and adds a row which counts the number
# of objects in dfO which are within radius r
def worker_job(args):
try:
dfC, dfO, newcol, r = args
mxC=dfC.as_matrix()
mxO = dfO.as_matrix()
# magic tree stuff
C_Tree = cKDTree(mxC)
O_Tree = cKDTree(mxO)
listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0)
counts=[]
for i in listoflists:
counts.append(len(i))
s = pd.Series(counts)
dfC[newcol] = s.values
except:
raise
traceback.print_exc()
else:
return dfC
如果我这样创建我的参数:
args=[dfC,dfO,"new_column_name",3]
当我 运行 它本身时它完美地工作:
worker_job(args)
+----+----+-----------------+
| x | y | new_column_name |
+----+----+-----------------+
| 35 | 5 | 4 |
+----+----+-----------------+
| 96 | 18 | 1 |
+----+----+-----------------+
| 23 | 25 | 0 |
+----+----+-----------------+
| 20 | 7 | 1 |
+----+----+-----------------+
| 74 | 54 | 2 |
+----+----+-----------------+
现在,我尝试构建将并行控制并行工作器和 运行 这个东西的函数。这是我的最大努力:
# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
try:
pool = Pool(Num_Proc)
parts = pool.map(Function,args)
pool.close()
pool.join()
results_df = pd.concat(parts)
except:
pool.close()
pool.terminate()
traceback.print_exc()
else:
return results_df
这是行不通的。 Run_Parallel(worker_job,2,args)
抛出关于 ValueError: not enough values to unpack (expected 4, got 2)
的错误。在通过包装器时,参数列表一定发生了变化。
我正在寻找关于此错误的具体指导,并为知道如何解决更大问题的任何人加分——我需要我的池包含 100% 的 dfO
和一个子集dfC
为了效率。
答案是将参数作为列表列表传递。这也解决了拆分数据帧的另一个问题(我认为池默认会处理这个问题,但事实并非如此)。
正确的函数应该是这样的:
# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
dfC, dfO, newcol, r = args
# to make lists of lists
argslist=[]
dfOlist=[]
dfClist=[]
resultlist=[]
# split dfC into parts
Cparts=np.array_split(dfC, Num_Proc)
# build the lists
for i in range(Num_Proc):
argslist.append([Cparts[i],dfO,newcol,r])
try:
pool = Pool(Num_Proc)
parts = pool.map(Function,argslist)
pool.close()
pool.join()
results_df = pd.concat(parts)
except:
pool.close()
pool.terminate()
traceback.print_exc()
else:
return results_df