Python 多处理并按顺序保存数据

Python multiprocessing and save data in order

我正在逐行分析一个 cvs 文件数据。对于每一行,我将生成一个字符串来存储分析结果。

由于文件很大,我必须进行多处理。但是因为我必须通过索引访问结果,所以我必须按顺序存储它们(到本地文件)。

我尝试过的一种方法是使用锁,但它仍然很慢。如果我不加锁,运行速度很快,但是数据会乱。

我想做的是将这些结果存储到全局列表中。当所有子流程完成后,我可以写入本地文件。 任何提示如何在没有锁的情况下进行以及如何加快速度?

以下是我的多处理部分代码:

def worker(dat,fileName,l):
    l.acquire()
    target = open(fileName,"a")
    for values in dat:
        # recursively apply different start mean2, find best solution
        model = MixGaussian(values)
        bf = model.findBestFit()
        # find the value where it's equally probable belongs to two gaussians 
        m1 = bf[0]
        m2 = bf[1]
        d1 = bf[2]
        d2 = bf[3]

        # calculate x
        k = math.log((d1+0.001)/(d2+0.001))* d1 * d2
        a = d1 -d2 
        b = 2 * (m1*d2 - m2 * d1)
        c = m2 * m2 * d1 - m1* m1 * d2 - k 
        delta = -1 * math.sqrt(b*b - 4 * a * c)
        if a == 0:
            a += 0.01
        x = (-1 * b + delta) / (2 * a)
        bf.append(x)
        print bf

        target.write(",".join(str(ele) for ele in bf))
        target.write("\n")

    target.close()
    l.release()

if __name__ == "__main__":
    # read from line 8000 to 8999
    data = readFile("unc_expr.tsv",8000,9000)
    target = open("modelstest9.csv","w")
    target.write("mean_1,mean_2,deviantion_1,deviation_2,cross_value")
    target.write("\n")
    target.close()

    numPrcs = 16
    d = []
    for i in range(numPrcs-1):
        d.append(data[i*len(data)/numPrcs:(i+1) *len(data)/numPrcs])
    d.append(data[(numPrcs-1)*len(data)/numPrcs:])

    start_time = time.time()
    lock = Lock()
    print("start time: %s"%start_time)

    for i in range(numPrcs):    
        Process(target=worker,args=(d[i],"modelstest9.csv",lock)).start()

谢谢!!

ThreadPoolExecuter 非常适合 运行 将相同的方法与不同的数据集并行处理,并重新组装每个线程的结果。

当结果从每个线程返回时,将它们放在一个列表中,例如 [(index1, result1), (index2, result2),...] as/when 他们 return,当所有线程完成时按索引对列表排序,然后写入排序列表到文件。

这里要注意的是,这会将所有结果保存在内存中,但您应该能够这样做,因为您已经在每个进程中将它们全部保存在内存中。

我建议使用 multiprocessing.Pool 并使用其 imap 方法来完成这项工作。让工作人员 return 值而不是直接写入它们,主进程执行所有 I/O。 imap 保证您按照任务的调度顺序获得结果,并且只有主进程执行 I/O,不可能发生冲突。

这也是一项改进,因为您可以将工作分成固定的块,而不是仔细划分工作以匹配您希望启动的进程数。 multiprocessing.Pool,默认情况下,会生成与您的 CPU 核心数量相等的工作人员(因此无需手动指定工作人员数量,并且有太少、浪费核心或太多、浪费时间的风险开关)。并且map/imap和公司将在N个工人中无缝拆分X个工作项,而不需要确保工作项的数量等于工作项的数量。