python 具有多个输入参数但只有一个可迭代的函数中的多处理池
python multiprocessing pool in a function with multiple input parameter but only one iterable
我有一个带有多个参数的函数,iterable_token
、dataframe
、label_array
。但是,函数中只有 iterable_token
是可迭代的。
def cross_tab(label,token_presence):
A_token=0
B_token=0
C_token=0
D_token=0
for i,j in zip(list(label),list(token_presence)):
if i==True and j==True:
A_token+=1
elif i==False and j==False:
D_token+=1
elif i==True and j==False:
C_token+=1
elif i==False and j==True:
B_token+=1
return A_token,B_token,C_token,D_token
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
for token in iterable_token:
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
except Exception as e:
pass
return (A,B,C,D,token_count,token_list)
如何并行化 My_ParallelFunction
函数?
Edit1:我尝试了示例 1 中建议的方法,因为这就是我正在寻找的并行化函数。
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))
但错误信息是
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<timed exec> in <module>
/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
272 `func` and (a, b) becomes func(a, b).
273 '''
--> 274 return self._map_async(func, iterable, starmapstar, chunksize).get()
275
276 def starmap_async(self, func, iterable, chunksize=None, callback=None,
/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
Edit2: 这是我正在使用的文件。您可以从 here 下载并解压。此外,运行 下面的脚本可以获取所需的输入参数。确保安装 nltk
、pandas
和 numpy
并更改文件 TokenFile.csv
.
的路径
from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np
dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)
def get_uniquetoken(stop_words,input_doc_list):
##get unique words across all documents
if stop_words:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
else:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
unique_words=set(unique_words)
print('unique_words done! length is:',len(unique_words) )
return unique_words
input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)
编辑 3 这是我正在使用的解决方案
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
i=0
with mp.Pool(4) as p:
token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
#print(token_result[0])
return token_result#(A,B,C,D,token_count,token_list)
def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
#print(token)
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
#print(token_presence_sum)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
#print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
# print('token_list:',token_list)
except Exception as e:
pass
return A,B,C,D,token_count,token_list
但是它没有给我想要的结果。它是一个 949 X 6 X different_sizes 矩阵
这里有两个玩具示例,展示了如何并行化类似的函数。
第一个选项。如果你想并行化整个功能。您可以使用 Pool.starmap() 来做到这一点。 .starmap() 的工作方式类似于 map(),但您可以向其传递多个参数。
from multiprocessing import Pool
import time
#Example 1 Simple function parallelization
def f(a,b,c,_list):
x = a+b+c
time.sleep(1)
_list.append(x)
return _list
inputs = [
(1,2,3,['a','b','c']),
(1,2,3,['d','e','f']),
(1,2,3,['x','y','z']),
(1,2,3,['A','B','C']),
]
start = time.time()
with Pool(4) as p:
results = p.starmap(f, inputs)
end = time.time()
for r in results:
print(r)
print(f'done in {round(end-start, 3)} seconds')
输出:
['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds
第二个选项。如果只想并行化函数内的 for-loop 。在这种情况下,您应该将循环重写为函数并使用 Pool.map() 或 Pool.starmap().
调用它
#Example 2. Function calling a parallel function
#loop
def g(_string):
time.sleep(1)
return _string + '*'
#outer function
def f(a,b,c,_list):
x = a+b+c
_list.append(str(x))
#loop parallelization
with Pool(4) as p:
new_list = p.map(g, _list)
return new_list
start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()
print(result)
print(f'done in {round(end-start, 3)} seconds')
输出:
['a*', 'b*', 'c*', '6*']
done in 1.048 seconds
请注意,“循环函数”包含处理可迭代对象的单个元素的逻辑。 Pool.map() 将为所有元素处理 运行 它。
time.sleep(1)
调用是为了模拟一些time-consuming计算。如果并行化有效,您应该能够在 1 秒内处理 4 个输入,而不是 4 秒。
这是一个使用您的代码的示例:
def My_ParallelFunction(iterable_token, dataframe, label_array):
with mp.Pool(4) as p:
token_result = p.starmap(
_loop,
[(token, dataframe, label_array) for token in iterable_token]
)
return token_result
def _loop(token, dataframe, label_array):
A={}
B={}
C={}
D={}
token_count = {}
token_list = []
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return A,B,C,D,token_count,token_list
except Exception as e:
print(e)
如果您只想多处理 for 循环而不是整个函数,那么按照这些思路应该可行。
from multiprocessing import Pool
def My_ParallelFunction(iterable_token,dataframe,label_array):
def get_token_counts(token,dataframe,label_array):
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
return token,A_token,B_token,C_token,D_token
except Exception as e:
print(e)
pass
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
with Pool() as p:
p_results = p.starmap(get_token_counts, [(token, dataframe, label_array) for token in iterable_token])
for res in p_results:
if res is None:
continue
token,A_token,B_token,C_token,D_token = res
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return (A,B,C,D,token_count,token_list)
我从工作函数中删除了将元素添加到列表和字典的部分,因为您必须查看队列或共享对象才能乘法附加到列表或字典。它的工作量更大,但应该使您的代码 运行 稍微快一些(这完全取决于您的可迭代对象中有多少元素以及需要花费大量时间来计算的元素)。
此代码背后的想法是您创建一个辅助函数 get_token_counts
,它将 运行 放入每个线程,前提是它有一个 token
、一个 dataframe
和一个 label_array
。函数的 returning 部分包含将元素添加到字典所需的所有元素(因为你无法真正知道哪个线程先完成,你 return token
和它解决了你所有的索引问题。虽然,也许 starmap 保持参数的顺序,所以也许没有必要)。
计算完所有元素后,您可以继续将它们添加到列表和字典中。
这基本上是与 cross_tab
一起对一些数据帧函数进行多处理,不完全是 My_ParallelFunction
。
由于你没有给出示例,我无法真正测试代码并想出更好的东西。
我有一个带有多个参数的函数,iterable_token
、dataframe
、label_array
。但是,函数中只有 iterable_token
是可迭代的。
def cross_tab(label,token_presence):
A_token=0
B_token=0
C_token=0
D_token=0
for i,j in zip(list(label),list(token_presence)):
if i==True and j==True:
A_token+=1
elif i==False and j==False:
D_token+=1
elif i==True and j==False:
C_token+=1
elif i==False and j==True:
B_token+=1
return A_token,B_token,C_token,D_token
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
for token in iterable_token:
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
except Exception as e:
pass
return (A,B,C,D,token_count,token_list)
如何并行化 My_ParallelFunction
函数?
Edit1:我尝试了示例 1 中建议的方法,因为这就是我正在寻找的并行化函数。
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))
但错误信息是
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<timed exec> in <module>
/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
272 `func` and (a, b) becomes func(a, b).
273 '''
--> 274 return self._map_async(func, iterable, starmapstar, chunksize).get()
275
276 def starmap_async(self, func, iterable, chunksize=None, callback=None,
/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
Edit2: 这是我正在使用的文件。您可以从 here 下载并解压。此外,运行 下面的脚本可以获取所需的输入参数。确保安装 nltk
、pandas
和 numpy
并更改文件 TokenFile.csv
.
from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np
dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)
def get_uniquetoken(stop_words,input_doc_list):
##get unique words across all documents
if stop_words:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
else:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
unique_words=set(unique_words)
print('unique_words done! length is:',len(unique_words) )
return unique_words
input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)
编辑 3 这是我正在使用的解决方案
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
i=0
with mp.Pool(4) as p:
token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
#print(token_result[0])
return token_result#(A,B,C,D,token_count,token_list)
def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
#print(token)
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
#print(token_presence_sum)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
#print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
# print('token_list:',token_list)
except Exception as e:
pass
return A,B,C,D,token_count,token_list
但是它没有给我想要的结果。它是一个 949 X 6 X different_sizes 矩阵
这里有两个玩具示例,展示了如何并行化类似的函数。
第一个选项。如果你想并行化整个功能。您可以使用 Pool.starmap() 来做到这一点。 .starmap() 的工作方式类似于 map(),但您可以向其传递多个参数。
from multiprocessing import Pool
import time
#Example 1 Simple function parallelization
def f(a,b,c,_list):
x = a+b+c
time.sleep(1)
_list.append(x)
return _list
inputs = [
(1,2,3,['a','b','c']),
(1,2,3,['d','e','f']),
(1,2,3,['x','y','z']),
(1,2,3,['A','B','C']),
]
start = time.time()
with Pool(4) as p:
results = p.starmap(f, inputs)
end = time.time()
for r in results:
print(r)
print(f'done in {round(end-start, 3)} seconds')
输出:
['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds
第二个选项。如果只想并行化函数内的 for-loop 。在这种情况下,您应该将循环重写为函数并使用 Pool.map() 或 Pool.starmap().
调用它#Example 2. Function calling a parallel function
#loop
def g(_string):
time.sleep(1)
return _string + '*'
#outer function
def f(a,b,c,_list):
x = a+b+c
_list.append(str(x))
#loop parallelization
with Pool(4) as p:
new_list = p.map(g, _list)
return new_list
start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()
print(result)
print(f'done in {round(end-start, 3)} seconds')
输出:
['a*', 'b*', 'c*', '6*']
done in 1.048 seconds
请注意,“循环函数”包含处理可迭代对象的单个元素的逻辑。 Pool.map() 将为所有元素处理 运行 它。
time.sleep(1)
调用是为了模拟一些time-consuming计算。如果并行化有效,您应该能够在 1 秒内处理 4 个输入,而不是 4 秒。
这是一个使用您的代码的示例:
def My_ParallelFunction(iterable_token, dataframe, label_array):
with mp.Pool(4) as p:
token_result = p.starmap(
_loop,
[(token, dataframe, label_array) for token in iterable_token]
)
return token_result
def _loop(token, dataframe, label_array):
A={}
B={}
C={}
D={}
token_count = {}
token_list = []
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return A,B,C,D,token_count,token_list
except Exception as e:
print(e)
如果您只想多处理 for 循环而不是整个函数,那么按照这些思路应该可行。
from multiprocessing import Pool
def My_ParallelFunction(iterable_token,dataframe,label_array):
def get_token_counts(token,dataframe,label_array):
try:
token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
return token,A_token,B_token,C_token,D_token
except Exception as e:
print(e)
pass
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
with Pool() as p:
p_results = p.starmap(get_token_counts, [(token, dataframe, label_array) for token in iterable_token])
for res in p_results:
if res is None:
continue
token,A_token,B_token,C_token,D_token = res
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return (A,B,C,D,token_count,token_list)
我从工作函数中删除了将元素添加到列表和字典的部分,因为您必须查看队列或共享对象才能乘法附加到列表或字典。它的工作量更大,但应该使您的代码 运行 稍微快一些(这完全取决于您的可迭代对象中有多少元素以及需要花费大量时间来计算的元素)。
此代码背后的想法是您创建一个辅助函数 get_token_counts
,它将 运行 放入每个线程,前提是它有一个 token
、一个 dataframe
和一个 label_array
。函数的 returning 部分包含将元素添加到字典所需的所有元素(因为你无法真正知道哪个线程先完成,你 return token
和它解决了你所有的索引问题。虽然,也许 starmap 保持参数的顺序,所以也许没有必要)。
计算完所有元素后,您可以继续将它们添加到列表和字典中。
这基本上是与 cross_tab
一起对一些数据帧函数进行多处理,不完全是 My_ParallelFunction
。
由于你没有给出示例,我无法真正测试代码并想出更好的东西。