Python 多处理并按顺序保存数据
Python multiprocessing and save data in order
我正在逐行分析一个 cvs 文件数据。对于每一行,我将生成一个字符串来存储分析结果。
由于文件很大,我必须进行多处理。但是因为我必须通过索引访问结果,所以我必须按顺序存储它们(到本地文件)。
我尝试过的一种方法是使用锁,但它仍然很慢。如果我不加锁,运行速度很快,但是数据会乱。
我想做的是将这些结果存储到全局列表中。当所有子流程完成后,我可以写入本地文件。
任何提示如何在没有锁的情况下进行以及如何加快速度?
以下是我的多处理部分代码:
def worker(dat,fileName,l):
l.acquire()
target = open(fileName,"a")
for values in dat:
# recursively apply different start mean2, find best solution
model = MixGaussian(values)
bf = model.findBestFit()
# find the value where it's equally probable belongs to two gaussians
m1 = bf[0]
m2 = bf[1]
d1 = bf[2]
d2 = bf[3]
# calculate x
k = math.log((d1+0.001)/(d2+0.001))* d1 * d2
a = d1 -d2
b = 2 * (m1*d2 - m2 * d1)
c = m2 * m2 * d1 - m1* m1 * d2 - k
delta = -1 * math.sqrt(b*b - 4 * a * c)
if a == 0:
a += 0.01
x = (-1 * b + delta) / (2 * a)
bf.append(x)
print bf
target.write(",".join(str(ele) for ele in bf))
target.write("\n")
target.close()
l.release()
if __name__ == "__main__":
# read from line 8000 to 8999
data = readFile("unc_expr.tsv",8000,9000)
target = open("modelstest9.csv","w")
target.write("mean_1,mean_2,deviantion_1,deviation_2,cross_value")
target.write("\n")
target.close()
numPrcs = 16
d = []
for i in range(numPrcs-1):
d.append(data[i*len(data)/numPrcs:(i+1) *len(data)/numPrcs])
d.append(data[(numPrcs-1)*len(data)/numPrcs:])
start_time = time.time()
lock = Lock()
print("start time: %s"%start_time)
for i in range(numPrcs):
Process(target=worker,args=(d[i],"modelstest9.csv",lock)).start()
谢谢!!
ThreadPoolExecuter 非常适合 运行 将相同的方法与不同的数据集并行处理,并重新组装每个线程的结果。
当结果从每个线程返回时,将它们放在一个列表中,例如 [(index1, result1), (index2, result2),...]
as/when 他们 return,当所有线程完成时按索引对列表排序,然后写入排序列表到文件。
这里要注意的是,这会将所有结果保存在内存中,但您应该能够这样做,因为您已经在每个进程中将它们全部保存在内存中。
我建议使用 multiprocessing.Pool
并使用其 imap
方法来完成这项工作。让工作人员 return
值而不是直接写入它们,主进程执行所有 I/O。 imap
保证您按照任务的调度顺序获得结果,并且只有主进程执行 I/O,不可能发生冲突。
这也是一项改进,因为您可以将工作分成固定的块,而不是仔细划分工作以匹配您希望启动的进程数。 multiprocessing.Pool
,默认情况下,会生成与您的 CPU 核心数量相等的工作人员(因此无需手动指定工作人员数量,并且有太少、浪费核心或太多、浪费时间的风险开关)。并且map
/imap
和公司将在N个工人中无缝拆分X个工作项,而不需要确保工作项的数量等于工作项的数量。
我正在逐行分析一个 cvs 文件数据。对于每一行,我将生成一个字符串来存储分析结果。
由于文件很大,我必须进行多处理。但是因为我必须通过索引访问结果,所以我必须按顺序存储它们(到本地文件)。
我尝试过的一种方法是使用锁,但它仍然很慢。如果我不加锁,运行速度很快,但是数据会乱。
我想做的是将这些结果存储到全局列表中。当所有子流程完成后,我可以写入本地文件。 任何提示如何在没有锁的情况下进行以及如何加快速度?
以下是我的多处理部分代码:
def worker(dat,fileName,l):
l.acquire()
target = open(fileName,"a")
for values in dat:
# recursively apply different start mean2, find best solution
model = MixGaussian(values)
bf = model.findBestFit()
# find the value where it's equally probable belongs to two gaussians
m1 = bf[0]
m2 = bf[1]
d1 = bf[2]
d2 = bf[3]
# calculate x
k = math.log((d1+0.001)/(d2+0.001))* d1 * d2
a = d1 -d2
b = 2 * (m1*d2 - m2 * d1)
c = m2 * m2 * d1 - m1* m1 * d2 - k
delta = -1 * math.sqrt(b*b - 4 * a * c)
if a == 0:
a += 0.01
x = (-1 * b + delta) / (2 * a)
bf.append(x)
print bf
target.write(",".join(str(ele) for ele in bf))
target.write("\n")
target.close()
l.release()
if __name__ == "__main__":
# read from line 8000 to 8999
data = readFile("unc_expr.tsv",8000,9000)
target = open("modelstest9.csv","w")
target.write("mean_1,mean_2,deviantion_1,deviation_2,cross_value")
target.write("\n")
target.close()
numPrcs = 16
d = []
for i in range(numPrcs-1):
d.append(data[i*len(data)/numPrcs:(i+1) *len(data)/numPrcs])
d.append(data[(numPrcs-1)*len(data)/numPrcs:])
start_time = time.time()
lock = Lock()
print("start time: %s"%start_time)
for i in range(numPrcs):
Process(target=worker,args=(d[i],"modelstest9.csv",lock)).start()
谢谢!!
ThreadPoolExecuter 非常适合 运行 将相同的方法与不同的数据集并行处理,并重新组装每个线程的结果。
当结果从每个线程返回时,将它们放在一个列表中,例如 [(index1, result1), (index2, result2),...]
as/when 他们 return,当所有线程完成时按索引对列表排序,然后写入排序列表到文件。
这里要注意的是,这会将所有结果保存在内存中,但您应该能够这样做,因为您已经在每个进程中将它们全部保存在内存中。
我建议使用 multiprocessing.Pool
并使用其 imap
方法来完成这项工作。让工作人员 return
值而不是直接写入它们,主进程执行所有 I/O。 imap
保证您按照任务的调度顺序获得结果,并且只有主进程执行 I/O,不可能发生冲突。
这也是一项改进,因为您可以将工作分成固定的块,而不是仔细划分工作以匹配您希望启动的进程数。 multiprocessing.Pool
,默认情况下,会生成与您的 CPU 核心数量相等的工作人员(因此无需手动指定工作人员数量,并且有太少、浪费核心或太多、浪费时间的风险开关)。并且map
/imap
和公司将在N个工人中无缝拆分X个工作项,而不需要确保工作项的数量等于工作项的数量。