Python 对大容量文件的 csv 数据进行多处理写入
Python Multiprocessing write to csv data for huge volume files
我正在尝试进行计算并使用多处理程序将其写入另一个 txt 文件。我在输出 txt 文件中得到计数不匹配。每次执行我都会得到不同的输出计数。
我是 python 的新手,有人可以帮忙吗?
import pandas as pd
import multiprocessing as mp
source = "\share\usr\data.txt"
target = "\share\usr\data_masked.txt"
Chunk = 10000
def process_calc(df):
'''
get source df do calc and return newdf
...
'''
return(newdf)
def calc_frame(df):
output_df = process_calc(df)
output_df.to_csv(target,index=None,sep='|',mode='a',header=False)
if __name__ == '__main__':
reader= pd.read_table(source,sep='|',chunksize = chunk,encoding='ANSI')
pool = mp.Pool(mp.cpu_count())
jobs = []
for each_df in reader:
process = mp.Process(target=calc_frame,args=(each_df)
jobs.append(process)
process.start()
for j in jobs:
j.join()
您发布的源代码中有几个问题,甚至会阻止它编译,更不用说 运行。我试图纠正这些问题,以解决您的主要问题。但是请务必彻底检查下面的代码,以确保更正有意义。
首先,Process
构造函数的 args 参数应指定为 tuple
。您指定了 args=(each_df)
,但 (each_df)
是 而不是 一个 tuple
,它是一个简单的括号表达式;您需要 (each_df,)
才能生成 if a tuple
(该语句也缺少右括号)。
除了没有针对同时尝试附加到同一文件的多个进程做出任何准备之外,您遇到的问题是您无法确定进程完成的顺序,因此您无法真正控制顺序其中数据帧将附加到 csv 文件。
解决方案是使用具有 imap
method 的处理池。传递给此方法的 iterable 只是 reader
,当迭代 return 时,它是下一个要处理的数据帧。来自 imap
的 return 值是一个 iterable,当迭代时将 return 来自 calc_frame
的下一个 return 值 按任务提交顺序,即与提交数据帧的顺序相同。因此,随着这些新的、修改后的数据帧被 returned,主进程可以简单地将它们一一附加到输出文件中:
import pandas as pd
import multiprocessing as mp
source = r"\share\usr\data.txt"
target = r"\share\usr\data_masked.txt"
Chunk = 10000
def process_calc(df):
'''
get source df do calc and return newdf
...
'''
return(newdf)
def calc_frame(df):
output_df = process_calc(df)
return output_df
if __name__ == '__main__':
with mp.Pool() as pool:
reader = pd.read_table(source, sep='|', chunksize=Chunk, encoding='ANSI')
for output_df in pool.imap(process_calc, reader):
output_df.to_csv(target, index=None, sep='|', mode='a', header=False)
我正在尝试进行计算并使用多处理程序将其写入另一个 txt 文件。我在输出 txt 文件中得到计数不匹配。每次执行我都会得到不同的输出计数。
我是 python 的新手,有人可以帮忙吗?
import pandas as pd
import multiprocessing as mp
source = "\share\usr\data.txt"
target = "\share\usr\data_masked.txt"
Chunk = 10000
def process_calc(df):
'''
get source df do calc and return newdf
...
'''
return(newdf)
def calc_frame(df):
output_df = process_calc(df)
output_df.to_csv(target,index=None,sep='|',mode='a',header=False)
if __name__ == '__main__':
reader= pd.read_table(source,sep='|',chunksize = chunk,encoding='ANSI')
pool = mp.Pool(mp.cpu_count())
jobs = []
for each_df in reader:
process = mp.Process(target=calc_frame,args=(each_df)
jobs.append(process)
process.start()
for j in jobs:
j.join()
您发布的源代码中有几个问题,甚至会阻止它编译,更不用说 运行。我试图纠正这些问题,以解决您的主要问题。但是请务必彻底检查下面的代码,以确保更正有意义。
首先,Process
构造函数的 args 参数应指定为 tuple
。您指定了 args=(each_df)
,但 (each_df)
是 而不是 一个 tuple
,它是一个简单的括号表达式;您需要 (each_df,)
才能生成 if a tuple
(该语句也缺少右括号)。
除了没有针对同时尝试附加到同一文件的多个进程做出任何准备之外,您遇到的问题是您无法确定进程完成的顺序,因此您无法真正控制顺序其中数据帧将附加到 csv 文件。
解决方案是使用具有 imap
method 的处理池。传递给此方法的 iterable 只是 reader
,当迭代 return 时,它是下一个要处理的数据帧。来自 imap
的 return 值是一个 iterable,当迭代时将 return 来自 calc_frame
的下一个 return 值 按任务提交顺序,即与提交数据帧的顺序相同。因此,随着这些新的、修改后的数据帧被 returned,主进程可以简单地将它们一一附加到输出文件中:
import pandas as pd
import multiprocessing as mp
source = r"\share\usr\data.txt"
target = r"\share\usr\data_masked.txt"
Chunk = 10000
def process_calc(df):
'''
get source df do calc and return newdf
...
'''
return(newdf)
def calc_frame(df):
output_df = process_calc(df)
return output_df
if __name__ == '__main__':
with mp.Pool() as pool:
reader = pd.read_table(source, sep='|', chunksize=Chunk, encoding='ANSI')
for output_df in pool.imap(process_calc, reader):
output_df.to_csv(target, index=None, sep='|', mode='a', header=False)