为什么多处理比 Pandas 中的简单计算慢?
why is multiprocessing slower than a simple computation in Pandas?
这与
有关
再次考虑这个简单(但有趣)的例子:
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd
master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'Whosebug is nice']})
slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'Whosebug is nice'],'my_value': [1,2,3,4,5]})
def fuzzy_score(str1, str2):
return fuzz.token_set_ratio(str1, str2)
def helper(orig_string, slave_df):
slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
#return my_value corresponding to the highest score
return slave_df.loc[slave_df.score.idxmax(),'my_value']
master
Out[39]:
original
0 this is a nice sentence
1 this is another one
2 Whosebug is nice
slave
Out[40]:
my_value name
0 1 hello world
1 2 congratulations
2 3 this is a nice sentence
3 4 this is another one
4 5 Whosebug is nice
我需要做的很简单:
- 对于
master
中的每一行,我使用 fuzzywuzzy
. 计算的字符串相似度得分查找数据框 slave
以获得最佳匹配
现在让我们把这些数据帧放大一点:
master = pd.concat([master] * 100, ignore_index = True)
slave = pd.concat([slave] * 10, ignore_index = True)
首先,我试过 dask
#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
现在是时间:
#multithreaded
%timeit dmaster.compute(get=dask.threaded.get)
1 loop, best of 3: 346 ms per loop
#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get)
1 loop, best of 3: 1.93 s per loop
#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop
其次,我已经尝试过旧的multiprocessing
包
from multiprocessing import Pool, cpu_count
def myfunc(df):
return df.original.apply(lambda x: helper(x, slave))
from datetime import datetime
if __name__ == '__main__':
startTime = datetime.now()
p = Pool(cpu_count() - 1)
ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
master.iloc[200:300 ,]])
results = pd.concat(ret_list)
print datetime.now() - startTime
给出的时间差不多
runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000
问题:为什么 Dask
和 multiprocessing
的多处理速度比 Pandas 慢?假设我的真实数据比这大得多。我能得到更好的结果吗?
毕竟我这里考虑的问题是embarassingly parallel
(每一行都是一个独立的问题),所以这些包应该真的大放异彩。
我是不是遗漏了什么?
谢谢!
让我把我的评论总结成一个答案。我希望这些信息有用,因为这里有很多问题。
首先,我想向您指出 distributed.readthedocs.io/en/latest/efficiency.html ,其中讨论了许多 dask 性能主题。请注意,这都是关于分布式调度程序的,但由于它可以在进程内启动,可以使用线程或进程,或这些的组合,它确实取代了以前的 dask 调度程序,并且通常在所有情况下都推荐使用。
1) 创建进程需要时间。这总是正确的,在 windows 上尤其如此。如果您对现实生活中的性能感兴趣,您将只想创建一次进程,开销固定,并且 运行 许多任务。在 dask 中,有 many ways 创建集群,甚至在本地。
2) dask(或任何其他调度程序)处理的每个任务都会产生一些开销。在分布式调度程序的情况下,这是 <1ms,但在任务本身的 运行time 很短的情况下,这可能很重要。
3) 将整个数据集加载到客户端并将其传递给 worker(s) 是 dask 中的一种反模式。相反,您希望使用像 dask.dataframe.read_csv
这样的函数,其中数据由工作人员加载,避免昂贵的序列化和进程间通信。 Dask 非常擅长将计算移动到数据所在的位置,从而最大限度地减少通信。
4) 当进程之间通信时,序列化的方法很重要,这就是我对为什么非 dask 多处理对你来说这么慢的猜测。
5) 最后,并非所有作业都能在 dask 下获得性能提升。这取决于很多因素,但通常最主要的因素是:数据是否适合内存?如果是,可能很难匹配 numpy 和 pandas 中优化良好的方法。一如既往,您应该始终分析您的代码...
这与
再次考虑这个简单(但有趣)的例子:
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd
master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'Whosebug is nice']})
slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'Whosebug is nice'],'my_value': [1,2,3,4,5]})
def fuzzy_score(str1, str2):
return fuzz.token_set_ratio(str1, str2)
def helper(orig_string, slave_df):
slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
#return my_value corresponding to the highest score
return slave_df.loc[slave_df.score.idxmax(),'my_value']
master
Out[39]:
original
0 this is a nice sentence
1 this is another one
2 Whosebug is nice
slave
Out[40]:
my_value name
0 1 hello world
1 2 congratulations
2 3 this is a nice sentence
3 4 this is another one
4 5 Whosebug is nice
我需要做的很简单:
- 对于
master
中的每一行,我使用fuzzywuzzy
. 计算的字符串相似度得分查找数据框
slave
以获得最佳匹配
现在让我们把这些数据帧放大一点:
master = pd.concat([master] * 100, ignore_index = True)
slave = pd.concat([slave] * 10, ignore_index = True)
首先,我试过 dask
#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
现在是时间:
#multithreaded
%timeit dmaster.compute(get=dask.threaded.get)
1 loop, best of 3: 346 ms per loop
#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get)
1 loop, best of 3: 1.93 s per loop
#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop
其次,我已经尝试过旧的multiprocessing
包
from multiprocessing import Pool, cpu_count
def myfunc(df):
return df.original.apply(lambda x: helper(x, slave))
from datetime import datetime
if __name__ == '__main__':
startTime = datetime.now()
p = Pool(cpu_count() - 1)
ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
master.iloc[200:300 ,]])
results = pd.concat(ret_list)
print datetime.now() - startTime
给出的时间差不多
runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000
问题:为什么 Dask
和 multiprocessing
的多处理速度比 Pandas 慢?假设我的真实数据比这大得多。我能得到更好的结果吗?
毕竟我这里考虑的问题是embarassingly parallel
(每一行都是一个独立的问题),所以这些包应该真的大放异彩。
我是不是遗漏了什么?
谢谢!
让我把我的评论总结成一个答案。我希望这些信息有用,因为这里有很多问题。
首先,我想向您指出 distributed.readthedocs.io/en/latest/efficiency.html ,其中讨论了许多 dask 性能主题。请注意,这都是关于分布式调度程序的,但由于它可以在进程内启动,可以使用线程或进程,或这些的组合,它确实取代了以前的 dask 调度程序,并且通常在所有情况下都推荐使用。
1) 创建进程需要时间。这总是正确的,在 windows 上尤其如此。如果您对现实生活中的性能感兴趣,您将只想创建一次进程,开销固定,并且 运行 许多任务。在 dask 中,有 many ways 创建集群,甚至在本地。
2) dask(或任何其他调度程序)处理的每个任务都会产生一些开销。在分布式调度程序的情况下,这是 <1ms,但在任务本身的 运行time 很短的情况下,这可能很重要。
3) 将整个数据集加载到客户端并将其传递给 worker(s) 是 dask 中的一种反模式。相反,您希望使用像 dask.dataframe.read_csv
这样的函数,其中数据由工作人员加载,避免昂贵的序列化和进程间通信。 Dask 非常擅长将计算移动到数据所在的位置,从而最大限度地减少通信。
4) 当进程之间通信时,序列化的方法很重要,这就是我对为什么非 dask 多处理对你来说这么慢的猜测。
5) 最后,并非所有作业都能在 dask 下获得性能提升。这取决于很多因素,但通常最主要的因素是:数据是否适合内存?如果是,可能很难匹配 numpy 和 pandas 中优化良好的方法。一如既往,您应该始终分析您的代码...