许多函数的并行映射 x.f() 而不是 f(x)

Parallel mapping x.f() instead of f(x) for many functions

我有一个非常大的 pandas 数据框,我想在其上映射许多函数。 因为框架很大,我写了一些代码来并行化这个:

import pandas as pd
import numpy as np
from multiprocessing import cpu_count(), Pool

my_frame = pd.DataFrame(...) # A large data frame with the column "data"

def parallel_map(series: pd.Series, func):
    cores = cpu_count()
    partitions = cores
    data_split = np.array_split(series, partitions)
    print(f"Parallelizing with {cores} cores...")
    with Pool(cores) as pool:
        data = pd.concat(pool.map(func, data_split))
    pool.join()
    return data

我想用 pd.Series.map 来称呼它,即我想为每一行计算东西;像这样:

def transform_data(entry):
    # Do expensive stuff
    return entry

非并行,我现在可以做

my_frame["data"].map(transform_data)

然而,对于并行版本,我需要在全局命名空间中定义一个额外的函数来反转调用者,因为Pool.map适用于f(x),但我想调用x.f().该函数需要能够被 pickle-able 才能 运行 由 Pool:

def inverted_transform_data(column: pd.Series):
    return column.map(transform_data)

现在我可以这样调用并行版本了:

parallel_map(data=my_frame["data"], func=inverted_transform_data)

问题是我想为许多需要按顺序处理的函数执行此操作,即 transform_data1, transform_data2, ...。这需要我为它们中的每一个创建这个全局包装函数。

是否有更好的替代品仍然可以腌制?

天啊! https://dask.org/

Dask 是一个专门针对并行 pandas 的项目。我强烈建议您将其用于您的用例。如果您只想在坚持使用 pandas 的同时提高性能,请查看此处的文档:

https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html

我发现这篇文章特别有用:

https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6

编辑:

用dask你会做:

import dask.dataframe as dd

df = # import method such as dd.read_csv("df.csv")
df.apply(func, ...) # or dd.data_col.apply(func, ...)
df.compute()

我最终得到了一个 "low budget" 解决方案,因为我不想将 dask 作为依赖项引入。它只是创建一个可调用的包装器 class:

class InvertedCallerMap(object):

    def __init__(self, func):
        """
        Required so the parallel map can call x.f() instead of f(x) without running into pickling issues
        :param func: Function to invert from x.f() to f(x)
        """
        self.func = func

    def __call__(self, column: pd.Series):
        return column.map(self.func)


def parallel_map(series, func, invert=True):
    cores = cpu_count()
    partitions = cores
    data_split = np.array_split(series, partitions)
    if invert:
        func = InvertedCallerMap(func=func)
    with Pool(cores) as pool:
        data = pd.concat(pool.map(func, data_split))
    pool.join()
    return data