Missing/Lost 使用 python 和 pandas 的多处理时的文件输出
Missing/Lost file out put when using multiprocessing with python and pandas
我有一套股票价格历史,文件夹'Raw'下包含8160个文件。我不是使用 pandas 对它们中的每一个进行一些调整,而是将新文件输出到另一个文件夹 'Cleaned'。任何处理失败的文件,应写入名为 'Failed'.
的文件夹中
所有文件都是 csv。
我运行下面的代码后,发现下面没有文件失败。但是大约有 100 个文件丢失了。 'Clean' 文件夹仅包含 8060 个文件。
我检查了差异并找出了丢失的文件。我发现所有文件的格式都是正确的,如果我用 for 循环的方式可以正常处理。所以我怀疑可能是我没有正确使用多进程。
这是代码。
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv(raw_folder_path + file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path + file)
except:
print(file + ' Not Successfull')
stock.to_csv(failed_folder_path + file)
#run multiprocessing
files_list = os.listdir(raw_folder_path)
pool = multiprocessing.Pool()
pool.map(cleanup, files_list)
我希望有人能告诉我我做错了什么?谢谢!
我已将建议放入问题评论中。最终代码大概应该是这样的:
import multiprocessing, os, datetime
import pandas as pd
raw_folder_path = './input/'
clean_folder_path = './clean/'
failed_folder_path = './failed/'
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv(raw_folder_path + file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path + file)
assert os.path.exists(clean_folder_path + file)
return True
except Exception as ex:
print(file + ' Not Successfull ', ex)
stock.to_csv(failed_folder_path + file)
assert os.path.exists(failed_folder_path + file)
return False
if __name__ == '__main__':
files_list = os.listdir(raw_folder_path)
with multiprocessing.Pool() as pool:
results = pool.map(cleanup, files_list)
assert all(results), f'{len(results) - sum(results)} wrong results!'
assert len(results) == len(files_list)
我有一套股票价格历史,文件夹'Raw'下包含8160个文件。我不是使用 pandas 对它们中的每一个进行一些调整,而是将新文件输出到另一个文件夹 'Cleaned'。任何处理失败的文件,应写入名为 'Failed'.
的文件夹中所有文件都是 csv。
我运行下面的代码后,发现下面没有文件失败。但是大约有 100 个文件丢失了。 'Clean' 文件夹仅包含 8060 个文件。
我检查了差异并找出了丢失的文件。我发现所有文件的格式都是正确的,如果我用 for 循环的方式可以正常处理。所以我怀疑可能是我没有正确使用多进程。
这是代码。
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv(raw_folder_path + file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path + file)
except:
print(file + ' Not Successfull')
stock.to_csv(failed_folder_path + file)
#run multiprocessing
files_list = os.listdir(raw_folder_path)
pool = multiprocessing.Pool()
pool.map(cleanup, files_list)
我希望有人能告诉我我做错了什么?谢谢!
我已将建议放入问题评论中。最终代码大概应该是这样的:
import multiprocessing, os, datetime
import pandas as pd
raw_folder_path = './input/'
clean_folder_path = './clean/'
failed_folder_path = './failed/'
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv(raw_folder_path + file)
try:
del stock['Unnamed: 0']
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock.to_csv(clean_folder_path + file)
assert os.path.exists(clean_folder_path + file)
return True
except Exception as ex:
print(file + ' Not Successfull ', ex)
stock.to_csv(failed_folder_path + file)
assert os.path.exists(failed_folder_path + file)
return False
if __name__ == '__main__':
files_list = os.listdir(raw_folder_path)
with multiprocessing.Pool() as pool:
results = pool.map(cleanup, files_list)
assert all(results), f'{len(results) - sum(results)} wrong results!'
assert len(results) == len(files_list)