多处理组申请 python
Multiprocessing group apply python
我有两组,一组包含要作为组处理的行,另一组包含要查看的组。
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']})
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']})
gr1 = test.groupby('city')
gr2 = test2.groupby('city')
目前我正在将我的函数应用到组的每一行,
gr1.apply(lambda x: custom_func(x.Address1, gr2.get_group(x.name)))
但是我不知道如何对此进行多处理。请指教
编辑:-我尝试使用 dask
,但我无法将整个数据框传递给我的 dask 函数 - 因为它的 apply
函数存在限制。我尝试在我的 gr1(组)上使用 dask apply,但由于我在我的自定义函数中设置索引,dask 抛出一条错误消息 "Too many indexers".
对于 Dask,这给了我一个错误 - 'Pandas' object has no attribute 'city'
ddf1 = dd.from_pandas(test, 2)
ddf2 = dd.from_pandas(test2, 2)
dgr1 = ddf1.groupby('city')
dgr2 = ddf2.groupby('city')
meta = pd.DataFrame(columns=['Address1', 'score', 'idx','source_index'])
ddf1.map_partitions(custom_func, x.Address1, dgr2.get_group(x.city).Address1,meta=meta).compute()
我在这里提供了使用 dask 的替代解决方案,
import pandas as pd
from multiprocessing import Pool
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']})
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']})
test=test.assign(dataset = 'test')
test2=test2.assign(dataset = 'test2')
newdf=pd.concat([test2,test],keys = ['test2','test'])
gpd=newdf.groupby('city')
def my_func(mygrp):
test_data=mygrp.loc['test']
test2_data=mygrp.loc['test2']
#do something specific
#if needed print something
return {'Address':test2_data.Address1.values[0],'ID':test2_data.ID.values[0]} #return some other stuff
mypool=Pool(processes=2)
ret_list=mypool.imap(my_func,(group for name, group in gpd))
pd.DataFrame(ret_list)
returns 类似于
ID address
0 3 234 kookie Pl
1 1 123 chese wy
2 8 456 Pretzel Junktion
3 4 345 Pizzza DR
PS:在 OP 的问题中,两个相似的数据集在一个专门的函数中进行比较,这里的解决方案使用 pandas.concat
。还可以根据问题想象 pd.merge
。
我有两组,一组包含要作为组处理的行,另一组包含要查看的组。
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']})
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']})
gr1 = test.groupby('city')
gr2 = test2.groupby('city')
目前我正在将我的函数应用到组的每一行,
gr1.apply(lambda x: custom_func(x.Address1, gr2.get_group(x.name)))
但是我不知道如何对此进行多处理。请指教
编辑:-我尝试使用 dask
,但我无法将整个数据框传递给我的 dask 函数 - 因为它的 apply
函数存在限制。我尝试在我的 gr1(组)上使用 dask apply,但由于我在我的自定义函数中设置索引,dask 抛出一条错误消息 "Too many indexers".
对于 Dask,这给了我一个错误 - 'Pandas' object has no attribute 'city'
ddf1 = dd.from_pandas(test, 2)
ddf2 = dd.from_pandas(test2, 2)
dgr1 = ddf1.groupby('city')
dgr2 = ddf2.groupby('city')
meta = pd.DataFrame(columns=['Address1', 'score', 'idx','source_index'])
ddf1.map_partitions(custom_func, x.Address1, dgr2.get_group(x.city).Address1,meta=meta).compute()
我在这里提供了使用 dask 的替代解决方案,
import pandas as pd
from multiprocessing import Pool
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']})
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']})
test=test.assign(dataset = 'test')
test2=test2.assign(dataset = 'test2')
newdf=pd.concat([test2,test],keys = ['test2','test'])
gpd=newdf.groupby('city')
def my_func(mygrp):
test_data=mygrp.loc['test']
test2_data=mygrp.loc['test2']
#do something specific
#if needed print something
return {'Address':test2_data.Address1.values[0],'ID':test2_data.ID.values[0]} #return some other stuff
mypool=Pool(processes=2)
ret_list=mypool.imap(my_func,(group for name, group in gpd))
pd.DataFrame(ret_list)
returns 类似于
ID address
0 3 234 kookie Pl
1 1 123 chese wy
2 8 456 Pretzel Junktion
3 4 345 Pizzza DR
PS:在 OP 的问题中,两个相似的数据集在一个专门的函数中进行比较,这里的解决方案使用 pandas.concat
。还可以根据问题想象 pd.merge
。