如何在 python 并行处理的集群中创建集群?

How to create clusters within a cluster in python parallel processing?

我正在处理一项需要在多个阶段进行并行处理的任务。

我的问题是:

我有四个数据集。我写了一个函数来处理这 4 个数据集。 为了并行处理这 4 个数据集,我创建了 4 个集群,然后将 4 个数据集发送到 4 个集群。同样,我想将每个集群拆分为 4 个集群,因为我必须执行 group by 操作。我写了一段代码来处理上面的问题。但它给我一个错误。这是我的示例代码。

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=2, verbose=10)(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

# This is the function to do group by parallel processing.
def new(x):
    tsc_out = applyParallel(x.groupby(gbcols), custum_func) 

def f1():
    from joblib import Parallel, delayed
    Parallel(n_jobs=4)(delayed(new)(i) for i in range(4)) #4 data sets

如果我们只创建一次集群,这个函数是运行正确的。如果我们尝试将每个集群拆分为多个集群,它是行不通的。 在我的例子中,我需要创建总 20 个集群。

提前致谢。

如果我对你的情况理解正确,你可以使用 charm4py 的池做这样的事情 工人(请参阅 https://charm4py.readthedocs.io/en/latest/pool.html 了解更多信息 信息)。例如:

from charm4py import charm

def applyParallel(dfGrouped, func):
    retLst = charm.pool.map(func, [group for name, group in dfGrouped], ncores=2)
    return pd.concat(retLst)

# This is the function to do group by parallel processing
def new(x):
    tsc_out = applyParallel(x.groupby(gbcols), custom_func)

def f1(args):
    charm.pool.map(new, [i for i in range(4)], ncores=4, allow_nested=True)
    exit()

charm.start(f1)

使用工作池,您可以启动任务(我认为这与您所说的集群相同)指定多少个核心 您想要 运行 一组给定的任务。虽然我认为在你的情况下你可以 只需离开 ncores=-1 并让 charm4py 使用可用内核安排任务。

如果您有任务启动其他任务(如果您使用 allow_nested=True),它也会起作用。

需要注意一点,当你用charm4py启动一个程序时,你会指定启动多少个进程 (您甚至可以在多个主机上启动进程,应用程序可以使用 所有这些同时)。因此,例如,如果您有 8 个内核并且想要利用所有 他们,用 8 个进程启动程序。