许多函数的并行映射 x.f() 而不是 f(x)
Parallel mapping x.f() instead of f(x) for many functions
我有一个非常大的 pandas 数据框,我想在其上映射许多函数。
因为框架很大,我写了一些代码来并行化这个:
import pandas as pd
import numpy as np
from multiprocessing import cpu_count(), Pool
my_frame = pd.DataFrame(...) # A large data frame with the column "data"
def parallel_map(series: pd.Series, func):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
print(f"Parallelizing with {cores} cores...")
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data
我想用 pd.Series.map
来称呼它,即我想为每一行计算东西;像这样:
def transform_data(entry):
# Do expensive stuff
return entry
非并行,我现在可以做
my_frame["data"].map(transform_data)
然而,对于并行版本,我需要在全局命名空间中定义一个额外的函数来反转调用者,因为Pool.map
适用于f(x)
,但我想调用x.f()
.该函数需要能够被 pickle-able 才能 运行 由 Pool:
def inverted_transform_data(column: pd.Series):
return column.map(transform_data)
现在我可以这样调用并行版本了:
parallel_map(data=my_frame["data"], func=inverted_transform_data)
问题是我想为许多需要按顺序处理的函数执行此操作,即 transform_data1, transform_data2, ...
。这需要我为它们中的每一个创建这个全局包装函数。
是否有更好的替代品仍然可以腌制?
Dask 是一个专门针对并行 pandas 的项目。我强烈建议您将其用于您的用例。如果您只想在坚持使用 pandas 的同时提高性能,请查看此处的文档:
https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html
我发现这篇文章特别有用:
https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6
编辑:
用dask你会做:
import dask.dataframe as dd
df = # import method such as dd.read_csv("df.csv")
df.apply(func, ...) # or dd.data_col.apply(func, ...)
df.compute()
我最终得到了一个 "low budget" 解决方案,因为我不想将 dask 作为依赖项引入。它只是创建一个可调用的包装器 class:
class InvertedCallerMap(object):
def __init__(self, func):
"""
Required so the parallel map can call x.f() instead of f(x) without running into pickling issues
:param func: Function to invert from x.f() to f(x)
"""
self.func = func
def __call__(self, column: pd.Series):
return column.map(self.func)
def parallel_map(series, func, invert=True):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
if invert:
func = InvertedCallerMap(func=func)
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data
我有一个非常大的 pandas 数据框,我想在其上映射许多函数。 因为框架很大,我写了一些代码来并行化这个:
import pandas as pd
import numpy as np
from multiprocessing import cpu_count(), Pool
my_frame = pd.DataFrame(...) # A large data frame with the column "data"
def parallel_map(series: pd.Series, func):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
print(f"Parallelizing with {cores} cores...")
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data
我想用 pd.Series.map
来称呼它,即我想为每一行计算东西;像这样:
def transform_data(entry):
# Do expensive stuff
return entry
非并行,我现在可以做
my_frame["data"].map(transform_data)
然而,对于并行版本,我需要在全局命名空间中定义一个额外的函数来反转调用者,因为Pool.map
适用于f(x)
,但我想调用x.f()
.该函数需要能够被 pickle-able 才能 运行 由 Pool:
def inverted_transform_data(column: pd.Series):
return column.map(transform_data)
现在我可以这样调用并行版本了:
parallel_map(data=my_frame["data"], func=inverted_transform_data)
问题是我想为许多需要按顺序处理的函数执行此操作,即 transform_data1, transform_data2, ...
。这需要我为它们中的每一个创建这个全局包装函数。
是否有更好的替代品仍然可以腌制?
Dask 是一个专门针对并行 pandas 的项目。我强烈建议您将其用于您的用例。如果您只想在坚持使用 pandas 的同时提高性能,请查看此处的文档:
https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html
我发现这篇文章特别有用:
https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6
编辑:
用dask你会做:
import dask.dataframe as dd
df = # import method such as dd.read_csv("df.csv")
df.apply(func, ...) # or dd.data_col.apply(func, ...)
df.compute()
我最终得到了一个 "low budget" 解决方案,因为我不想将 dask 作为依赖项引入。它只是创建一个可调用的包装器 class:
class InvertedCallerMap(object):
def __init__(self, func):
"""
Required so the parallel map can call x.f() instead of f(x) without running into pickling issues
:param func: Function to invert from x.f() to f(x)
"""
self.func = func
def __call__(self, column: pd.Series):
return column.map(self.func)
def parallel_map(series, func, invert=True):
cores = cpu_count()
partitions = cores
data_split = np.array_split(series, partitions)
if invert:
func = InvertedCallerMap(func=func)
with Pool(cores) as pool:
data = pd.concat(pool.map(func, data_split))
pool.join()
return data