如何在 python 并行处理的集群中创建集群?
How to create clusters within a cluster in python parallel processing?
我正在处理一项需要在多个阶段进行并行处理的任务。
我的问题是:
我有四个数据集。我写了一个函数来处理这 4 个数据集。
为了并行处理这 4 个数据集,我创建了 4 个集群,然后将 4 个数据集发送到 4 个集群。同样,我想将每个集群拆分为 4 个集群,因为我必须执行 group by
操作。我写了一段代码来处理上面的问题。但它给我一个错误。这是我的示例代码。
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=2, verbose=10)(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
# This is the function to do group by parallel processing.
def new(x):
tsc_out = applyParallel(x.groupby(gbcols), custum_func)
def f1():
from joblib import Parallel, delayed
Parallel(n_jobs=4)(delayed(new)(i) for i in range(4)) #4 data sets
如果我们只创建一次集群,这个函数是运行正确的。如果我们尝试将每个集群拆分为多个集群,它是行不通的。
在我的例子中,我需要创建总 20
个集群。
提前致谢。
如果我对你的情况理解正确,你可以使用 charm4py 的池做这样的事情
工人(请参阅 https://charm4py.readthedocs.io/en/latest/pool.html 了解更多信息
信息)。例如:
from charm4py import charm
def applyParallel(dfGrouped, func):
retLst = charm.pool.map(func, [group for name, group in dfGrouped], ncores=2)
return pd.concat(retLst)
# This is the function to do group by parallel processing
def new(x):
tsc_out = applyParallel(x.groupby(gbcols), custom_func)
def f1(args):
charm.pool.map(new, [i for i in range(4)], ncores=4, allow_nested=True)
exit()
charm.start(f1)
使用工作池,您可以启动任务(我认为这与您所说的集群相同)指定多少个核心
您想要 运行 一组给定的任务。虽然我认为在你的情况下你可以
只需离开 ncores=-1
并让 charm4py 使用可用内核安排任务。
如果您有任务启动其他任务(如果您使用 allow_nested=True
),它也会起作用。
需要注意一点,当你用charm4py启动一个程序时,你会指定启动多少个进程
(您甚至可以在多个主机上启动进程,应用程序可以使用
所有这些同时)。因此,例如,如果您有 8 个内核并且想要利用所有
他们,用 8 个进程启动程序。
我正在处理一项需要在多个阶段进行并行处理的任务。
我的问题是:
我有四个数据集。我写了一个函数来处理这 4 个数据集。
为了并行处理这 4 个数据集,我创建了 4 个集群,然后将 4 个数据集发送到 4 个集群。同样,我想将每个集群拆分为 4 个集群,因为我必须执行 group by
操作。我写了一段代码来处理上面的问题。但它给我一个错误。这是我的示例代码。
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=2, verbose=10)(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
# This is the function to do group by parallel processing.
def new(x):
tsc_out = applyParallel(x.groupby(gbcols), custum_func)
def f1():
from joblib import Parallel, delayed
Parallel(n_jobs=4)(delayed(new)(i) for i in range(4)) #4 data sets
如果我们只创建一次集群,这个函数是运行正确的。如果我们尝试将每个集群拆分为多个集群,它是行不通的。
在我的例子中,我需要创建总 20
个集群。
提前致谢。
如果我对你的情况理解正确,你可以使用 charm4py 的池做这样的事情 工人(请参阅 https://charm4py.readthedocs.io/en/latest/pool.html 了解更多信息 信息)。例如:
from charm4py import charm
def applyParallel(dfGrouped, func):
retLst = charm.pool.map(func, [group for name, group in dfGrouped], ncores=2)
return pd.concat(retLst)
# This is the function to do group by parallel processing
def new(x):
tsc_out = applyParallel(x.groupby(gbcols), custom_func)
def f1(args):
charm.pool.map(new, [i for i in range(4)], ncores=4, allow_nested=True)
exit()
charm.start(f1)
使用工作池,您可以启动任务(我认为这与您所说的集群相同)指定多少个核心
您想要 运行 一组给定的任务。虽然我认为在你的情况下你可以
只需离开 ncores=-1
并让 charm4py 使用可用内核安排任务。
如果您有任务启动其他任务(如果您使用 allow_nested=True
),它也会起作用。
需要注意一点,当你用charm4py启动一个程序时,你会指定启动多少个进程 (您甚至可以在多个主机上启动进程,应用程序可以使用 所有这些同时)。因此,例如,如果您有 8 个内核并且想要利用所有 他们,用 8 个进程启动程序。