如何使用 Dask 从 google 云存储中读取多个大型 CSV 文件的块,而不会同时使内存过载
How to read chunks of multiple large CSV files from google cloud storage using Dask without overloading the memory all at once
我正在尝试从 google 存储中读取一堆大型 csv 文件(多个文件)。我使用 Dask 分发库进行并行计算,但我在这里面临的问题是,虽然我提到了 blocksize (100mb),但我我不确定如何逐个读取分区并将其保存到我的 postgres 数据库中,这样我就不想让我的内存过载。
from dask.distributed import Client
from dask.diagnostics import ProgressBar
client = Client(processes=False)
import dask.dataframe as dd
def read_csv_gcs():
with ProgressBar():
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
pd = df.compute(scheduler='threads')
return pd
def write_df_to_db(df):
try:
from sqlalchemy import create_engine
engine = create_engine('postgresql://usr:pass@localhost:5432/sampledb')
df.to_sql('sampletable', engine, if_exists='replace',index=False)
except Exception as e:
print(e)
pass
pd = read_csv_gcs()
write_df_to_db(pd)
以上代码是我的基本实现,但如前所述,我想分块读取它并更新数据库。像
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
for chunk in df:
write_it_to_db(chunk)
是否可以在 Dask 中完成? 还是我应该使用 pandas 的块大小并进行迭代,然后将其保存到数据库(但我想念并行此处计算)?
有人可以解释一下吗?
这一行
df.compute(scheduler='threads')
说:在工作线程中以块的形式加载数据,并将它们全部连接成一个 in-memory 数据帧,df
。这不是你想要的。您想要插入块,然后 从内存中删除它们。
您可能想使用 map_partitions
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
df.map_partitions(write_it_to_db).compute()
或使用df.to_delayed()
.
请注意,根据您的 SQL 驱动程序,您可能无法通过这种方式获得并行性,如果不能,pandas iter-chunk 方法会像嗯。
我正在尝试从 google 存储中读取一堆大型 csv 文件(多个文件)。我使用 Dask 分发库进行并行计算,但我在这里面临的问题是,虽然我提到了 blocksize (100mb),但我我不确定如何逐个读取分区并将其保存到我的 postgres 数据库中,这样我就不想让我的内存过载。
from dask.distributed import Client
from dask.diagnostics import ProgressBar
client = Client(processes=False)
import dask.dataframe as dd
def read_csv_gcs():
with ProgressBar():
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
pd = df.compute(scheduler='threads')
return pd
def write_df_to_db(df):
try:
from sqlalchemy import create_engine
engine = create_engine('postgresql://usr:pass@localhost:5432/sampledb')
df.to_sql('sampletable', engine, if_exists='replace',index=False)
except Exception as e:
print(e)
pass
pd = read_csv_gcs()
write_df_to_db(pd)
以上代码是我的基本实现,但如前所述,我想分块读取它并更新数据库。像
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
for chunk in df:
write_it_to_db(chunk)
是否可以在 Dask 中完成? 还是我应该使用 pandas 的块大小并进行迭代,然后将其保存到数据库(但我想念并行此处计算)?
有人可以解释一下吗?
这一行
df.compute(scheduler='threads')
说:在工作线程中以块的形式加载数据,并将它们全部连接成一个 in-memory 数据帧,df
。这不是你想要的。您想要插入块,然后 从内存中删除它们。
您可能想使用 map_partitions
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
df.map_partitions(write_it_to_db).compute()
或使用df.to_delayed()
.
请注意,根据您的 SQL 驱动程序,您可能无法通过这种方式获得并行性,如果不能,pandas iter-chunk 方法会像嗯。