使用 python 读取文件夹中的多个镶木地板文件并写入单个 csv 文件
Read multiple parquet files in a folder and write to single csv file using python
我是 python 的新手,我有一个场景,其中有多个 parquet 文件,文件名按顺序排列。例如:par_file1、par_file2、par_file3 等等一个文件夹中最多 100 个文件。
我需要从 file1 开始依次读取这些 parquet 文件并将其写入单个 csv 文件。写入文件 1 的内容后,文件 2 的内容应附加到相同的 csv 中,而无需 header。请注意,所有文件都具有相同的列名,只有数据被拆分到多个文件中。
我学会了使用 pyarrow 将单个镶木地板转换为 csv 文件,代码如下:
import pandas as pd
df = pd.read_parquet('par_file.parquet')
df.to_csv('csv_file.csv')
但我无法将其扩展为循环多个 parquet 文件并附加到单个 csv。
pandas 中有方法可以做到这一点吗?或任何其他方式来做到这一点会有很大的帮助。谢谢。
如果您要将文件复制到您的本地计算机和 运行 您的代码,您可以这样做。下面的代码假设您 运行 将代码放在与 parquet 文件相同的目录中。它还假设文件的命名如上所示:"order. ex: par_file1,par_file2,par_file3 and so on upto 100 files in a folder." 如果您需要搜索文件,则需要使用 glob
获取文件名并明确提供要保存的路径csv:open(r'this\is\your\path\to\csv_file.csv', 'a')
希望这有帮助。
import pandas as pd
# Create an empty csv file and write the first parquet file with headers
with open('csv_file.csv','w') as csv_file:
print('Reading par_file1.parquet')
df = pd.read_parquet('par_file1.parquet')
df.to_csv(csv_file, index=False)
print('par_file1.parquet appended to csv_file.csv\n')
csv_file.close()
# create your file names and append to an empty list to look for in the current directory
files = []
for i in range(2,101):
files.append(f'par_file{i}.parquet')
# open files and append to csv_file.csv
for f in files:
print(f'Reading {f}')
df = pd.read_parquet(f)
with open('csv_file.csv','a') as file:
df.to_csv(file, header=False, index=False)
print(f'{f} appended to csv_file.csv\n')
如果需要,您可以删除打印语句。
在 python 3.6
中使用 pandas 0.23.3
进行了测试
我 运行 进入这个问题,想看看 pandas 是否可以本地读取分区镶木地板数据集。我不得不说当前的答案不必要地冗长(使其难以解析)。我还认为不断 opening/closing 文件句柄然后根据大小扫描到它们的末尾并不是特别有效。
更好的选择是将所有 parquet 文件读取到一个 DataFrame 中,然后写入一次:
from pathlib import Path
import pandas as pd
data_dir = Path('dir/to/parquet/files')
full_df = pd.concat(
pd.read_parquet(parquet_file)
for parquet_file in data_dir.glob('*.parquet')
)
full_df.to_csv('csv_file.csv')
或者,如果您真的只想追加到文件:
data_dir = Path('dir/to/parquet/files')
for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
df = pd.read_parquet(parquet_path)
write_header = i == 0 # write header only on the 0th file
write_mode = 'w' if i == 0 else 'a' # 'write' mode for 0th file, 'append' otherwise
df.to_csv('csv_file.csv', mode=write_mode, header=write_header)
附加每个文件的最终替代方案,该文件在开始时以 "a+"
模式打开目标 CSV 文件,将文件句柄扫描到每个 write/append 的文件末尾(我相信这有效,但 实际上 没有测试过):
data_dir = Path('dir/to/parquet/files')
with open('csv_file.csv', "a+") as csv_handle:
for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
df = pd.read_parquet(parquet_path)
write_header = i == 0 # write header only on the 0th file
df.to_csv(csv_handle, header=write_header)
这帮助我将所有镶木地板文件加载到一个数据框中
import glob
files = glob.glob("*.snappy.parquet")
data = [pd.read_parquet(f,engine='fastparquet') for f in files]
merged_data = pd.concat(data,ignore_index=True)
对于那些试图读取远程文件的人来说,这是一个小改动,这有助于更快地读取它(直接 read_parquet 读取远程文件对我来说这样做要慢得多):
import io
merged = []
# remote_reader = ... <- init some remote reader, for example AzureDLFileSystem()
for f in files:
with remote_reader.open(f, 'rb') as f_reader:
merged.append(remote_reader.read())
merged = pd.concat((pd.read_parquet(io.BytesIO(file_bytes)) for file_bytes in merged))
虽然增加了一些临时内存开销。
我有类似的需求,我读到当前 Pandas 版本支持目录路径作为 read_csv 函数的参数。所以你可以像这样读取多个镶木地板文件:
import pandas as pd
df = pd.read_parquet('path/to/the/parquet/files/directory')
它将所有内容连接到一个数据帧中,因此您可以在之后立即将其转换为 csv:
df.to_csv('csv_file.csv')
确保您根据文档具有以下依赖项:
- pyarrow
- fastparquet
您可以使用 Dask 读取多个 Parquet 文件并将它们写入单个 CSV。
Dask 接受星号 (*) 作为通配符/全局字符以匹配相关文件名。
确保在写入 CSV 文件时将 single_file
设置为 True
,将 index
设置为 False
。
import pandas as pd
import numpy as np
# create some dummy dataframes using np.random and write to separate parquet files
rng = np.random.default_rng()
for i in range(3):
df = pd.DataFrame(rng.integers(0, 100, size=(10, 4)), columns=list('ABCD'))
df.to_parquet(f"dummy_df_{i}.parquet")
# load multiple parquet files with Dask
import dask.dataframe as dd
ddf = dd.read_parquet('dummy_df_*.parquet', index=False)
# write to single csv
ddf.to_csv("dummy_df_all.csv",
single_file=True,
index=False
)
# test to verify
df_test = pd.read_csv("dummy_df_all.csv")
为此使用 Dask 意味着您不必担心生成的文件大小(Dask 是一个分布式计算框架,可以处理您扔给它的任何东西,而 pandas 可能会抛出 MemoryError 如果生成的 DataFrame 太大),您可以轻松地从 Amazon S3 等云数据存储中读取和写入。
我是 python 的新手,我有一个场景,其中有多个 parquet 文件,文件名按顺序排列。例如:par_file1、par_file2、par_file3 等等一个文件夹中最多 100 个文件。
我需要从 file1 开始依次读取这些 parquet 文件并将其写入单个 csv 文件。写入文件 1 的内容后,文件 2 的内容应附加到相同的 csv 中,而无需 header。请注意,所有文件都具有相同的列名,只有数据被拆分到多个文件中。
我学会了使用 pyarrow 将单个镶木地板转换为 csv 文件,代码如下:
import pandas as pd
df = pd.read_parquet('par_file.parquet')
df.to_csv('csv_file.csv')
但我无法将其扩展为循环多个 parquet 文件并附加到单个 csv。 pandas 中有方法可以做到这一点吗?或任何其他方式来做到这一点会有很大的帮助。谢谢。
如果您要将文件复制到您的本地计算机和 运行 您的代码,您可以这样做。下面的代码假设您 运行 将代码放在与 parquet 文件相同的目录中。它还假设文件的命名如上所示:"order. ex: par_file1,par_file2,par_file3 and so on upto 100 files in a folder." 如果您需要搜索文件,则需要使用 glob
获取文件名并明确提供要保存的路径csv:open(r'this\is\your\path\to\csv_file.csv', 'a')
希望这有帮助。
import pandas as pd
# Create an empty csv file and write the first parquet file with headers
with open('csv_file.csv','w') as csv_file:
print('Reading par_file1.parquet')
df = pd.read_parquet('par_file1.parquet')
df.to_csv(csv_file, index=False)
print('par_file1.parquet appended to csv_file.csv\n')
csv_file.close()
# create your file names and append to an empty list to look for in the current directory
files = []
for i in range(2,101):
files.append(f'par_file{i}.parquet')
# open files and append to csv_file.csv
for f in files:
print(f'Reading {f}')
df = pd.read_parquet(f)
with open('csv_file.csv','a') as file:
df.to_csv(file, header=False, index=False)
print(f'{f} appended to csv_file.csv\n')
如果需要,您可以删除打印语句。
在 python 3.6
中使用 pandas 0.23.3
我 运行 进入这个问题,想看看 pandas 是否可以本地读取分区镶木地板数据集。我不得不说当前的答案不必要地冗长(使其难以解析)。我还认为不断 opening/closing 文件句柄然后根据大小扫描到它们的末尾并不是特别有效。
更好的选择是将所有 parquet 文件读取到一个 DataFrame 中,然后写入一次:
from pathlib import Path
import pandas as pd
data_dir = Path('dir/to/parquet/files')
full_df = pd.concat(
pd.read_parquet(parquet_file)
for parquet_file in data_dir.glob('*.parquet')
)
full_df.to_csv('csv_file.csv')
或者,如果您真的只想追加到文件:
data_dir = Path('dir/to/parquet/files')
for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
df = pd.read_parquet(parquet_path)
write_header = i == 0 # write header only on the 0th file
write_mode = 'w' if i == 0 else 'a' # 'write' mode for 0th file, 'append' otherwise
df.to_csv('csv_file.csv', mode=write_mode, header=write_header)
附加每个文件的最终替代方案,该文件在开始时以 "a+"
模式打开目标 CSV 文件,将文件句柄扫描到每个 write/append 的文件末尾(我相信这有效,但 实际上 没有测试过):
data_dir = Path('dir/to/parquet/files')
with open('csv_file.csv', "a+") as csv_handle:
for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
df = pd.read_parquet(parquet_path)
write_header = i == 0 # write header only on the 0th file
df.to_csv(csv_handle, header=write_header)
这帮助我将所有镶木地板文件加载到一个数据框中
import glob
files = glob.glob("*.snappy.parquet")
data = [pd.read_parquet(f,engine='fastparquet') for f in files]
merged_data = pd.concat(data,ignore_index=True)
对于那些试图读取远程文件的人来说,这是一个小改动,这有助于更快地读取它(直接 read_parquet 读取远程文件对我来说这样做要慢得多):
import io
merged = []
# remote_reader = ... <- init some remote reader, for example AzureDLFileSystem()
for f in files:
with remote_reader.open(f, 'rb') as f_reader:
merged.append(remote_reader.read())
merged = pd.concat((pd.read_parquet(io.BytesIO(file_bytes)) for file_bytes in merged))
虽然增加了一些临时内存开销。
我有类似的需求,我读到当前 Pandas 版本支持目录路径作为 read_csv 函数的参数。所以你可以像这样读取多个镶木地板文件:
import pandas as pd
df = pd.read_parquet('path/to/the/parquet/files/directory')
它将所有内容连接到一个数据帧中,因此您可以在之后立即将其转换为 csv:
df.to_csv('csv_file.csv')
确保您根据文档具有以下依赖项:
- pyarrow
- fastparquet
您可以使用 Dask 读取多个 Parquet 文件并将它们写入单个 CSV。
Dask 接受星号 (*) 作为通配符/全局字符以匹配相关文件名。
确保在写入 CSV 文件时将 single_file
设置为 True
,将 index
设置为 False
。
import pandas as pd
import numpy as np
# create some dummy dataframes using np.random and write to separate parquet files
rng = np.random.default_rng()
for i in range(3):
df = pd.DataFrame(rng.integers(0, 100, size=(10, 4)), columns=list('ABCD'))
df.to_parquet(f"dummy_df_{i}.parquet")
# load multiple parquet files with Dask
import dask.dataframe as dd
ddf = dd.read_parquet('dummy_df_*.parquet', index=False)
# write to single csv
ddf.to_csv("dummy_df_all.csv",
single_file=True,
index=False
)
# test to verify
df_test = pd.read_csv("dummy_df_all.csv")
为此使用 Dask 意味着您不必担心生成的文件大小(Dask 是一个分布式计算框架,可以处理您扔给它的任何东西,而 pandas 可能会抛出 MemoryError 如果生成的 DataFrame 太大),您可以轻松地从 Amazon S3 等云数据存储中读取和写入。