Parallelization/multiprocessing 的条件 for 循环

Parallelization/multiprocessing of conditional for loop

我想在 Python 中使用多处理来加速 while 循环。

更具体地说:
我有一个矩阵(样本*特征)。我想要 select x 个样本子集,其随机特征子集的值不等于某个值(在本例中为 -1)。

我的序列号:

np.random.seed(43)
datafile = '...'
df = pd.read_csv(datafile, sep=" ", nrows = 89)

no_feat = 500
no_samp = 5
no_trees = 5
i=0
iter=0


samples = np.zeros((no_trees, no_samp))
features = np.zeros((no_trees, no_feat))

while i < no_trees:
    rand_feat = np.random.choice(df.shape[1], no_feat, replace=False)
    iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False)

    samp_idx = []
    a=0

#--------------
    #how to run in parallel?

    for j in iter_order:
        pot_samp = df.iloc[j, rand_feat]
        if len(np.where(pot_samp==-1)[0]) == 0:
            samp_idx.append(j)
        if len(samp_idx) == no_samp:
            print a
            break
        a+=1

#--------------

    if len(samp_idx) == no_samp:
        samples[i,:] = samp_idx
        features[i, :] = rand_feat
        i+=1
    iter+=1
    if iter>1000:   #break if subsets cannot be found
        break

搜索拟合样本是可能比较昂贵的部分(第 j 个 for 循环),理论上可以 运行 并行。在某些情况下,没有必要遍历所有样本来找到足够大的子集,这就是为什么我会在子集足够大时立即跳出循环。
我正在努力寻找一个允许检查已经生成了多少有效结果的实现。有可能吗?

我以前用过joblib。如果我理解正确的话,这会使用多处理的 pool 方法作为仅适用于单独任务的后端?我认为 queues 可能会有帮助,但到目前为止我未能实施它们。

我找到了可行的解决方案。我决定 运行 while 循环并行,并让不同的进程通过共享计数器进行交互。此外,我对合适样本的搜索进行了矢量化。

矢量化产生了约 300 倍的加速,运行4 核上的计算速度提高了约两倍。

首先,我尝试实施单独的流程并将结果放入 queue。原来这些不是用来存储大量数据的。

如果有人发现该代码中的另一个瓶颈,如果有人指出,我会很高兴。

以我对并行计算的基本不了解,我发现很难将其拼凑在一起,尤其是因为 Internet 上的示例都是非常基础的。不过我学到了很多东西 =)

我的代码:

import numpy as np
import pandas as pd
import itertools
from multiprocessing import Pool, Lock, Value
from datetime import datetime
import settings


val = Value('i', 0)
worker_ID = Value('i', 1)
lock = Lock()

def findSamp(no_trees, df, no_feat, no_samp):
    lock.acquire()
    print 'starting worker - {0}'.format(worker_ID.value)
    worker_ID.value +=1
    worker_ID_local = worker_ID.value
    lock.release()

    max_iter = 100000
    samp = []
    feat = []
    iter_outer = 0
    iter = 0
    while val.value < no_trees and iter_outer<max_iter:
        rand_feat = np.random.choice(df.shape[1], no_feat, replace=False

        #get samples with random features from dataset;
        #find and select samples that don't have missing values in the random features
        samp_rand = df.iloc[:,rand_feat]
        nan_idx = np.unique(np.where(samp_rand == -1)[0])
        all_idx = np.arange(df.shape[0])
        notnan_bool = np.invert(np.in1d(all_idx, nan_idx))
        notnan_idx = np.where(notnan_bool == True)[0]

        if notnan_idx.shape[0] >= no_samp:
            #if enough samples for random feature subset, select no_samp samples randomly
            notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False)
            rand_feat_rand = rand_feat

            lock.acquire()
            val.value += 1
            #x = val.value
            lock.release()
            #print 'no of trees generated: {0}'.format(x)
            samp.append(notnan_idx_rand)
            feat.append(rand_feat_rand)

        else:
            #increase iter_outer counter if no sample subset could be found for random feature subset
            iter_outer += 1

        iter+=1
    if iter >= max_iter:
        print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local)
    else:
        print 'worker{0} - finished'.format(worker_ID_local)
    return samp, feat

def initialize(*args):
    global val, worker_ID, lock
    val, worker_ID, lock  = args

def star_findSamp(i_df_no_feat_no_samp):
    return findSamp(*i_df_no_feat_no_samp)


if __name__ == '__main__':
    np.random.seed(43)
    datafile = '...'
    df = pd.read_csv(datafile, sep=" ", nrows = 89)
    df = df.fillna(-1)
    df = df.iloc[:, 6:]

    no_feat = 700
    no_samp = 10
    no_trees = 5000


    startTime = datetime.now()
    print 'starting multiprocessing'
    ncores = 4
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock))
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp))

    result = p.map(star_findSamp, args)#, callback=log_result)
    p.close()
    p.join()

    print '{0} sample subsets for tree training have been found'.format(val.value)

    samples = [x[0] for x in result if x != None]
    samples = np.vstack(samples)
    features = [x[1] for x in result if x != None]
    features = np.vstack(features)
    print datetime.now() - startTime