Pandas 简单 parralel/multiprocess 计算
Pandas easy parralel/multiprocess calculations
我正在寻找一种快速且易于使用的解决方案来使用 pandas 进行并行计算。我知道这对数据科学来说是一个非常重要的主题,但我没有找到 简单、比标准 pandas 快得多 df.apply
功能,总体上 实施速度快 !
所以...
让我们快速浏览一下可用的 tools/frameworks。当然,我假设不谈论 asyncio
这与我的主题没有直接关系。
达斯克
请在https://towardsdatascience.com/how-i-learned-to-love-parallelized-applies-with-python-pandas-dask-and-numba-f06b0b367138上找一篇好文章
或者直接在 Dask 网站上:http://docs.dask.org/en/latest/use-cases.html
在下面找到目前不起作用但给我们一个很好的实施思路的片段:
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
cores = cpu_count()
dd.from_pandas(my_df,npartitions=cores).\
map_partitions(
lambda df : df.apply(
lambda x : nearest_street(x.lat,x.lon),axis=1)).\
compute(get=get)
就我个人而言,我觉得这个实现很痛苦(好吧,也许我是个懒人),但总的来说,我发现这个实现不是很快,有时比旧时尚慢df[feature] = df.feature.apply(my_funct)
多处理
在下面找到一段代码 运行 可以轻松地完成多进程任务,但是...使用 HDD IO。此实现 可能工作也可能不工作 ,但让我们对代码实现有了很好的了解
import os
from multiprocessing import Process, cpu_count
from math import ceil
from tqdm import tqdm
import numpy as np
def chunks(l, n) :
numbs = [ceil(i) for i in np.linspace(0,len(l)+1, n+1)]
pairs = list()
for i, val in enumerate(numbs) :
try :
pairs.append((numbs[i], numbs[i+1]))
except :
return pairs
def my_funct(i0=0, i1=10000000) :
for n in tqdm(features[i0:i1]) :
_df = df.loc[df.feature == n, :]
_df = do_something_complex(_df)
_df.to_csv(f"{my_path}/feat-{n}.csv", index=False)
# multiprocessing
cores = cpu_count()
features = df.feature.unique()
if cores < 2 :
my_funct(i0=0, i1=100000000)
else :
chks = chunks(features, cores)
process_list = [Process(target=my_funct, args=chk) \
for chk in chks]
[i.start() for i in process_list]
[i.join() for i in process_list]
# join files and 'merge' in our new_df
new_df = pd.DataFrame(columns=df.columns)
for filename in os.listdir(my_path) :
new_df = new_df.append(pd.read_csv(f'{my_path}/{filename}'),\
axis=0, ignore_index=True)
os.remove(f'{my_path}/{filename}')
好吧,这个实现有点矫枉过正了,但是 1/ 它在大多数情况下都有效,2/ 它很容易理解,并且 3/ 它比 df = df.apply(my_funct) 快并且 --有时——比 Dask
快
但是 ... 假设我在统计上不能成为处理此类主题的 only/first 人...
你能帮帮我吗?
有什么解决办法吗?
有没有类似的东西:
- df.multi_process_apply(my_funct) 或
- df.parralel_apply(my_func)
非常感谢!
你可以试试Pandarallel.
免责声明:我是这个库的作者(它仍在开发中,但你已经可以用它取得很好的效果)。
没有并行化:
并行化:
只需将 df.apply(func)
替换为 df.parallel_apply(func)
即可使用所有 CPU。
我正在寻找一种快速且易于使用的解决方案来使用 pandas 进行并行计算。我知道这对数据科学来说是一个非常重要的主题,但我没有找到 简单、比标准 pandas 快得多 df.apply
功能,总体上 实施速度快 !
所以...
让我们快速浏览一下可用的 tools/frameworks。当然,我假设不谈论 asyncio
这与我的主题没有直接关系。
达斯克
请在https://towardsdatascience.com/how-i-learned-to-love-parallelized-applies-with-python-pandas-dask-and-numba-f06b0b367138上找一篇好文章 或者直接在 Dask 网站上:http://docs.dask.org/en/latest/use-cases.html
在下面找到目前不起作用但给我们一个很好的实施思路的片段:
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
cores = cpu_count()
dd.from_pandas(my_df,npartitions=cores).\
map_partitions(
lambda df : df.apply(
lambda x : nearest_street(x.lat,x.lon),axis=1)).\
compute(get=get)
就我个人而言,我觉得这个实现很痛苦(好吧,也许我是个懒人),但总的来说,我发现这个实现不是很快,有时比旧时尚慢df[feature] = df.feature.apply(my_funct)
多处理
在下面找到一段代码 运行 可以轻松地完成多进程任务,但是...使用 HDD IO。此实现 可能工作也可能不工作 ,但让我们对代码实现有了很好的了解
import os
from multiprocessing import Process, cpu_count
from math import ceil
from tqdm import tqdm
import numpy as np
def chunks(l, n) :
numbs = [ceil(i) for i in np.linspace(0,len(l)+1, n+1)]
pairs = list()
for i, val in enumerate(numbs) :
try :
pairs.append((numbs[i], numbs[i+1]))
except :
return pairs
def my_funct(i0=0, i1=10000000) :
for n in tqdm(features[i0:i1]) :
_df = df.loc[df.feature == n, :]
_df = do_something_complex(_df)
_df.to_csv(f"{my_path}/feat-{n}.csv", index=False)
# multiprocessing
cores = cpu_count()
features = df.feature.unique()
if cores < 2 :
my_funct(i0=0, i1=100000000)
else :
chks = chunks(features, cores)
process_list = [Process(target=my_funct, args=chk) \
for chk in chks]
[i.start() for i in process_list]
[i.join() for i in process_list]
# join files and 'merge' in our new_df
new_df = pd.DataFrame(columns=df.columns)
for filename in os.listdir(my_path) :
new_df = new_df.append(pd.read_csv(f'{my_path}/{filename}'),\
axis=0, ignore_index=True)
os.remove(f'{my_path}/{filename}')
好吧,这个实现有点矫枉过正了,但是 1/ 它在大多数情况下都有效,2/ 它很容易理解,并且 3/ 它比 df = df.apply(my_funct) 快并且 --有时——比 Dask
快但是 ... 假设我在统计上不能成为处理此类主题的 only/first 人...
你能帮帮我吗? 有什么解决办法吗? 有没有类似的东西:
- df.multi_process_apply(my_funct) 或
- df.parralel_apply(my_func)
非常感谢!
你可以试试Pandarallel.
免责声明:我是这个库的作者(它仍在开发中,但你已经可以用它取得很好的效果)。
没有并行化:
并行化:
只需将 df.apply(func)
替换为 df.parallel_apply(func)
即可使用所有 CPU。