python 具有多个输入参数但只有一个可迭代的函数中的多处理池

python multiprocessing pool in a function with multiple input parameter but only one iterable

我有一个带有多个参数的函数,iterable_tokendataframelabel_array。但是,函数中只有 iterable_token 是可迭代的。

def cross_tab(label,token_presence):
    A_token=0
    B_token=0
    C_token=0
    D_token=0
    for i,j in zip(list(label),list(token_presence)):
        if i==True and j==True:
            A_token+=1
        elif i==False and j==False:
            D_token+=1
        elif i==True and j==False:
            C_token+=1
        elif i==False and j==True:
            B_token+=1
    return A_token,B_token,C_token,D_token

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    for token in iterable_token:
        try:
            token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                A[token]=A_token
                B[token]=B_token
                C[token]=C_token
                D[token]=D_token
                token_count[token]=token_presence_sum
                token_list.append(token)
        except Exception as e:
            pass
    return (A,B,C,D,token_count,token_list)

如何并行化 My_ParallelFunction 函数?

Edit1:我尝试了示例 1 中建议的方法,因为这就是我正在寻找的并行化函数。

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
    results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))

但错误信息是

RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
<timed exec> in <module>

/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
    272         `func` and (a, b) becomes func(a, b).
    273         '''
--> 274         return self._map_async(func, iterable, starmapstar, chunksize).get()
    275 
    276     def starmap_async(self, func, iterable, chunksize=None, callback=None,

/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given

Edit2: 这是我正在使用的文件。您可以从 here 下载并解压。此外,运行 下面的脚本可以获取所需的输入参数。确保安装 nltkpandasnumpy 并更改文件 TokenFile.csv.

的路径
from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np

dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)

def get_uniquetoken(stop_words,input_doc_list):
    ##get unique words across all documents
    if stop_words:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
    else:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
    unique_words=set(unique_words)
    print('unique_words done! length is:',len(unique_words) )
    return unique_words


input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)

编辑 3 这是我正在使用的解决方案

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    i=0
    
    with mp.Pool(4) as p:
        token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
    #print(token_result[0])
    return token_result#(A,B,C,D,token_count,token_list)


def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
    #print(token)
    try:
        token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
        token_presence_sum=sum(token_presence)
        #print(token_presence_sum)
        if token_presence_sum:
            A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
            #print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
#             print('token_list:',token_list)
    except Exception as e:
        pass
    return A,B,C,D,token_count,token_list

但是它没有给我想要的结果。它是一个 949 X 6 X different_sizes 矩阵

这里有两个玩具示例,展示了如何并行化类似的函数。

第一个选项。如果你想并行化整个功能。您可以使用 Pool.starmap() 来做到这一点。 .starmap() 的工作方式类似于 map(),但您可以向其传递多个参数。

from multiprocessing import Pool
import time


#Example 1 Simple function parallelization
def f(a,b,c,_list):
    x = a+b+c
    time.sleep(1)
    _list.append(x)
    return _list

inputs = [
    (1,2,3,['a','b','c']),
    (1,2,3,['d','e','f']),
    (1,2,3,['x','y','z']),
    (1,2,3,['A','B','C']),
]

start = time.time()
with Pool(4) as p:
    results = p.starmap(f, inputs)
end = time.time()

for r in results:
    print(r)
    
print(f'done in {round(end-start, 3)} seconds')

输出:

['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds

第二个选项。如果只想并行化函数内的 for-loop 。在这种情况下,您应该将循环重写为函数并使用 Pool.map() 或 Pool.starmap().

调用它
#Example 2. Function calling a parallel function

#loop
def g(_string):
    time.sleep(1)
    return _string + '*'

#outer function
def f(a,b,c,_list):
    x = a+b+c
    _list.append(str(x))
    #loop parallelization
    with Pool(4) as p:
        new_list = p.map(g, _list)
    return new_list

start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()

print(result)
print(f'done in {round(end-start, 3)} seconds')

输出:

['a*', 'b*', 'c*', '6*']
done in 1.048 seconds

请注意,“循环函数”包含处理可迭代对象的单个元素的逻辑。 Pool.map() 将为所有元素处理 运行 它。

time.sleep(1)调用是为了模拟一些time-consuming计算。如果并行化有效,您应该能够在 1 秒内处理 4 个输入,而不是 4 秒。

这是一个使用您的代码的示例:

def My_ParallelFunction(iterable_token, dataframe, label_array):

    with mp.Pool(4) as p:
        token_result = p.starmap(
            _loop,
            [(token, dataframe, label_array) for token in iterable_token]
        )
    return token_result


def _loop(token, dataframe, label_array):
    A={}
    B={}
    C={}
    D={}
    token_count = {}
    token_list = []
    try:
        
        token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
        token_presence_sum=sum(token_presence)
        if token_presence_sum:
            A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
            return A,B,C,D,token_count,token_list

    except Exception as e:
        print(e)

如果您只想多处理 for 循环而不是整个函数,那么按照这些思路应该可行。

from multiprocessing import Pool

def My_ParallelFunction(iterable_token,dataframe,label_array):
    def get_token_counts(token,dataframe,label_array):
        try:
            token_presence=dataframe['Master'].str.contains('\b'+token+'\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                return token,A_token,B_token,C_token,D_token
        except Exception as e:
            print(e)
            pass
        
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    with Pool() as p:
        p_results = p.starmap(get_token_counts, [(token, dataframe, label_array) for token in iterable_token])
        
    for res in p_results:
        if res is None:
            continue
        token,A_token,B_token,C_token,D_token = res
        A[token]=A_token
        B[token]=B_token
        C[token]=C_token
        D[token]=D_token
        token_count[token]=token_presence_sum
        token_list.append(token)
    return (A,B,C,D,token_count,token_list)

我从工作函数中删除了将元素添加到列表和字典的部分,因为您必须查看队列或共享对象才能乘法附加到列表或字典。它的工作量更大,但应该使您的代码 运行 稍微快一些(这完全取决于您的可迭代对象中有多少元素以及需要花费大量时间来计算的元素)。

此代码背后的想法是您创建一个辅助函数 get_token_counts,它将 运行 放入每个线程,前提是它有一个 token、一个 dataframe 和一个 label_array。函数的 returning 部分包含将元素添加到字典所需的所有元素(因为你无法真正知道哪个线程先完成,你 return token 和它解决了你所有的索引问题。虽然,也许 starmap 保持参数的顺序,所以也许没有必要)。

计算完所有元素后,您可以继续将它们添加到列表和字典中。

这基本上是与 cross_tab 一起对一些数据帧函数进行多处理,不完全是 My_ParallelFunction

由于你没有给出示例,我无法真正测试代码并想出更好的东西。