使用 Pandas 的多处理读取、修改和写入数千个 csv 文件
Using multiprocessing with Pandas to read, modify and write thousands csv files
所以我在一个目录下有大约5000个csv文件,其中包含股票的分钟数据。每个文件都由它们的符号命名。像股票 AAPL 被命名为 AAPL.csv.
我尝试对它们中的每一个进行一些清理和编辑。在这种情况下,我尝试将包含 unix 纪元数据时间的一列转换为可读的日期和时间。我也想更改一列的标签。
我尝试使用多处理来加快进程。但首先尝试干掉我的 Macbook。
我 运行 它在 VScode 的 jupyter notebook 里面。如果这很重要。
我想知道我做错了什么以及如何改进。以及如何处理 python 和 pandas 中的类似任务。
谢谢!
这是我的代码。
# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
print('Working on {}'.format(file))
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']), axis=1)
stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']), axis=1)
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print('{} not successful'.format(file))
fail_list = fail_list.append(file)
fail_list.to_csv('./failed_list.csv')
#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')
#prepare failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
p = multiprocessing.Process(target=clean_up, args=(file,fail_list,))
processes.append(p)
p.start()
for process in processes:
process.join()
更新:CSV_FILE_SAMPLE
、开盘价、最高价、最低价、收盘价、交易量、日期时间
0,21.9,21.9,21.9,21.9,200,1596722940000
0,20.0,20.0,19.9937,19.9937,200,1595266500000
1,20.0,20.0,19.9937,19.9937,500,1595266800000
2,20.0,20.0,19.9937,19.9937,1094,1595267040000
3,20.0,20.0,20.0,20.0,200,1595268240000
最终更新:
结合@furas 和@jsmart 的回答,该脚本设法将 5000 csv 的处理时间从几小时减少到不到 1 分钟(在 Macbook pro 上的 6 核 i9 下)。我很高兴。你们真棒。谢谢!
最终脚本在这里:
import pandas as pd
import numpy as np
import os
import multiprocessing
import logging
logging.basicConfig(filename='./log.log',level=logging.DEBUG)
file_list = os.listdir('./Data/minutes_data/')
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock['Time'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.time
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print(file + ' Not successful')
logging.warning(file + ' Not complete.')
pool = multiprocessing.Pool()
pool.map(cleanup, file_list)
在循环中使用Process
同时创建5000个进程
您可以使用 Pool
来控制同时工作的进程数 - 它会自动释放下一个文件的进程。
它也可以使用return
将失败文件的名称发送到主进程并且它可以保存文件一次。在多个进程中使用同一个文件可能会在该文件中产生错误的数据。此外,进程不共享变量,每个进程都有自己的空 DataFrame,稍后将只保存自己的失败文件 - 因此它将删除以前的内容。
def clean_up(file):
# ... code ...
return None # if OK
except:
return file # if failed
# --- main ---
# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))
with multiprocessing.Pool(10) as p:
failed_files = p.map(clean_up, file_list)
# remove None from names
failed_files = filter(None, failed_files)
# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')
还有multiprocessing.pool.ThreadPool
,它使用threads
而不是processes
。
模块 concurrent.futures 还有 ThreadPoolExecutor
和 ProcessPoolExecutor
您也可以尝试使用外部模块 - 但我不记得哪个有用。
原 post 问“...如何处理 python 和 pandas 中的类似任务。”
- 替换
.apply(..., axis=1)
可以将吞吐量提高 100 倍或更好。
- 这是一个包含 10_000 行数据的示例:
%%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'], unit='ms'), axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Re-write 为:
%%timeit
df['date'] = pd.to_datetime(df['date'], unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
示例数据:
print(df['timestamp'].head())
0 1586863008214
1 1286654914895
2 1436424291218
3 1423512988135
4 1413205308057
Name: timestamp, dtype: int64
所以我在一个目录下有大约5000个csv文件,其中包含股票的分钟数据。每个文件都由它们的符号命名。像股票 AAPL 被命名为 AAPL.csv.
我尝试对它们中的每一个进行一些清理和编辑。在这种情况下,我尝试将包含 unix 纪元数据时间的一列转换为可读的日期和时间。我也想更改一列的标签。
我尝试使用多处理来加快进程。但首先尝试干掉我的 Macbook。
我 运行 它在 VScode 的 jupyter notebook 里面。如果这很重要。
我想知道我做错了什么以及如何改进。以及如何处理 python 和 pandas 中的类似任务。
谢谢!
这是我的代码。
# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
print('Working on {}'.format(file))
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']), axis=1)
stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']), axis=1)
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print('{} not successful'.format(file))
fail_list = fail_list.append(file)
fail_list.to_csv('./failed_list.csv')
#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')
#prepare failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
p = multiprocessing.Process(target=clean_up, args=(file,fail_list,))
processes.append(p)
p.start()
for process in processes:
process.join()
更新:CSV_FILE_SAMPLE
、开盘价、最高价、最低价、收盘价、交易量、日期时间 0,21.9,21.9,21.9,21.9,200,1596722940000 0,20.0,20.0,19.9937,19.9937,200,1595266500000 1,20.0,20.0,19.9937,19.9937,500,1595266800000 2,20.0,20.0,19.9937,19.9937,1094,1595267040000 3,20.0,20.0,20.0,20.0,200,1595268240000
最终更新:
结合@furas 和@jsmart 的回答,该脚本设法将 5000 csv 的处理时间从几小时减少到不到 1 分钟(在 Macbook pro 上的 6 核 i9 下)。我很高兴。你们真棒。谢谢!
最终脚本在这里:
import pandas as pd
import numpy as np
import os
import multiprocessing
import logging
logging.basicConfig(filename='./log.log',level=logging.DEBUG)
file_list = os.listdir('./Data/minutes_data/')
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock['Time'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.time
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print(file + ' Not successful')
logging.warning(file + ' Not complete.')
pool = multiprocessing.Pool()
pool.map(cleanup, file_list)
在循环中使用Process
同时创建5000个进程
您可以使用 Pool
来控制同时工作的进程数 - 它会自动释放下一个文件的进程。
它也可以使用return
将失败文件的名称发送到主进程并且它可以保存文件一次。在多个进程中使用同一个文件可能会在该文件中产生错误的数据。此外,进程不共享变量,每个进程都有自己的空 DataFrame,稍后将只保存自己的失败文件 - 因此它将删除以前的内容。
def clean_up(file):
# ... code ...
return None # if OK
except:
return file # if failed
# --- main ---
# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))
with multiprocessing.Pool(10) as p:
failed_files = p.map(clean_up, file_list)
# remove None from names
failed_files = filter(None, failed_files)
# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')
还有multiprocessing.pool.ThreadPool
,它使用threads
而不是processes
。
模块 concurrent.futures 还有 ThreadPoolExecutor
和 ProcessPoolExecutor
您也可以尝试使用外部模块 - 但我不记得哪个有用。
原 post 问“...如何处理 python 和 pandas 中的类似任务。”
- 替换
.apply(..., axis=1)
可以将吞吐量提高 100 倍或更好。 - 这是一个包含 10_000 行数据的示例:
%%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'], unit='ms'), axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Re-write 为:
%%timeit
df['date'] = pd.to_datetime(df['date'], unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
示例数据:
print(df['timestamp'].head())
0 1586863008214
1 1286654914895
2 1436424291218
3 1423512988135
4 1413205308057
Name: timestamp, dtype: int64